[jira] [Closed] (FLINK-7473) Possible Leak in GlobalWindows

2017-08-17 Thread Steve Jerman (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7473?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Steve Jerman closed FLINK-7473.
---
  Resolution: Not A Bug
Release Note: user error.

> Possible Leak in GlobalWindows
> --
>
> Key: FLINK-7473
> URL: https://issues.apache.org/jira/browse/FLINK-7473
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Affects Versions: 1.3.2
> Environment: See attached project
>Reporter: Steve Jerman
> Attachments: timerIssue.zip
>
>
> Hi,
> I have been wrestling with a issue with GlobalWindows. It seems like it leaks 
> instances of InternalTimer.
> I can't tell if it's a bug or my code so I created a 'minimal' project that 
> has the issue...
> If you run the Unit Test  in the attached and then monitor heap you will see 
> that the number of InternalTimers continually increases. I added code to 
> explicitly delete them.. doesn't seem to help.
> If I comment out registerEventTimeTimer ... no leak :)
> My suspicion is that PURGE/FIRE_AND_PURGE is leaving the timer in limbo.
> Steve



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4554: [FLINK-7442] Add option for using child-first classloader...

2017-08-17 Thread aljoscha
Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/4554
  
The system class loader stuff was leftover from an earlier version, I 
didn't mean to have that in there.

In the first version I copied the `ChildFirstClassLoader` from 
https://github.com/apache/flink/blob/fa11845b926f8371e9cee47775ca0e48176b686e/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDbMultiClassLoaderTest.java#L78.
 However, when I added the end-to-end tests I noticed that Akka was not working 
correctly anymore because it couldn't find some configuration stuff. (I'm 
running that again so that I can post the actual error messages)

I also changed the class loader code on the client because I thought people 
might be just as likely to use their clashing dependencies there as in 
operators. In that case they would suffer from the same problems. We're, after 
all, also creating a user-code class loader here.  I'm happy to be convinced 
otherwise, though.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-7442) Add option for using a child-first classloader for loading user code

2017-08-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7442?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16131754#comment-16131754
 ] 

ASF GitHub Bot commented on FLINK-7442:
---

Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/4554
  
The system class loader stuff was leftover from an earlier version, I 
didn't mean to have that in there.

In the first version I copied the `ChildFirstClassLoader` from 
https://github.com/apache/flink/blob/fa11845b926f8371e9cee47775ca0e48176b686e/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDbMultiClassLoaderTest.java#L78.
 However, when I added the end-to-end tests I noticed that Akka was not working 
correctly anymore because it couldn't find some configuration stuff. (I'm 
running that again so that I can post the actual error messages)

I also changed the class loader code on the client because I thought people 
might be just as likely to use their clashing dependencies there as in 
operators. In that case they would suffer from the same problems. We're, after 
all, also creating a user-code class loader here.  I'm happy to be convinced 
otherwise, though.


> Add option for using a child-first classloader for loading user code
> 
>
> Key: FLINK-7442
> URL: https://issues.apache.org/jira/browse/FLINK-7442
> Project: Flink
>  Issue Type: Improvement
>  Components: Local Runtime
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7473) Possible Leak in GlobalWindows

2017-08-17 Thread Steve Jerman (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7473?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16131745#comment-16131745
 ] 

Steve Jerman commented on FLINK-7473:
-

OK. I think I have figured my issue.

When setting my timeout I used the ctx.getCurrentWatermark() as the base ... 
seems like this is set to max int... Changing to use just the event time stamp 
seems to fix the leak.



> Possible Leak in GlobalWindows
> --
>
> Key: FLINK-7473
> URL: https://issues.apache.org/jira/browse/FLINK-7473
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Affects Versions: 1.3.2
> Environment: See attached project
>Reporter: Steve Jerman
> Attachments: timerIssue.zip
>
>
> Hi,
> I have been wrestling with a issue with GlobalWindows. It seems like it leaks 
> instances of InternalTimer.
> I can't tell if it's a bug or my code so I created a 'minimal' project that 
> has the issue...
> If you run the Unit Test  in the attached and then monitor heap you will see 
> that the number of InternalTimers continually increases. I added code to 
> explicitly delete them.. doesn't seem to help.
> If I comment out registerEventTimeTimer ... no leak :)
> My suspicion is that PURGE/FIRE_AND_PURGE is leaving the timer in limbo.
> Steve



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4557: [hotifx][streaming] Simplify state of TwoPhaseCommitSinkF...

2017-08-17 Thread pnowojski
Github user pnowojski commented on the issue:

https://github.com/apache/flink/pull/4557
  
Ok, I will create JIRA issues for such things in the future.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #4557: [hotifx][streaming] Simplify state of TwoPhaseCommitSinkF...

2017-08-17 Thread tzulitai
Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/4557
  
This LGTM, +1.
Merging this ..

Minor nitpick for the future: I think although this is a small fix, the 
nature of the fix still deserves a dedicated JIRA ticket for it. Hotfixes, 
AFAIK, are meant only for minor cosmetic refactorings or typos.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #4535: Eventhubs-support read from and write to Azure eventhubs

2017-08-17 Thread tzulitai
Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/4535
  
@zhuganghuaonnet please also rebase this pull request (there are 
conflicting classes) and change the pull request title.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #4535: Eventhubs-support read from and write to Azure eventhubs

2017-08-17 Thread zhuganghuaonnet
Github user zhuganghuaonnet commented on the issue:

https://github.com/apache/flink/pull/4535
  
@Tzu-Li Tai I have created a jira 
https://issues.apache.org/jira/browse/FLINK-7474. Pls help to review. Thx. :)


From: Tzu-Li Tai 
Sent: Wednesday, August 16, 2017 12:54:30 PM
To: apache/flink
Cc: zhuganghuaonnet; Mention
Subject: Re: [apache/flink] Eventhubs-support read from and write to Azure 
eventhubs (#4535)


@zhuganghuaonnet please let me know 
when you've opened the JIRA, and rebased this pull request onto the latest 
master. Also please remember to rename the pull request title (you can take a 
look at the other pull request titles as reference) and fill out the pull 
request template as necessary :)

Thanks a lot!

—
You are receiving this because you were mentioned.
Reply to this email directly, view it on 
GitHub, or 
mute the 
thread.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-7474) Flink supports consuming data from and sink data to Azure eventhubs

2017-08-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7474?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16131685#comment-16131685
 ] 

ASF GitHub Bot commented on FLINK-7474:
---

Github user zhuganghuaonnet commented on the issue:

https://github.com/apache/flink/pull/4535
  
@Tzu-Li Tai I have created a jira 
https://issues.apache.org/jira/browse/FLINK-7474. Pls help to review. Thx. :)


From: Tzu-Li Tai 
Sent: Wednesday, August 16, 2017 12:54:30 PM
To: apache/flink
Cc: zhuganghuaonnet; Mention
Subject: Re: [apache/flink] Eventhubs-support read from and write to Azure 
eventhubs (#4535)


@zhuganghuaonnet please let me know 
when you've opened the JIRA, and rebased this pull request onto the latest 
master. Also please remember to rename the pull request title (you can take a 
look at the other pull request titles as reference) and fill out the pull 
request template as necessary :)

Thanks a lot!

—
You are receiving this because you were mentioned.
Reply to this email directly, view it on 
GitHub, or 
mute the 
thread.



> Flink supports consuming data from and sink data to Azure eventhubs 
> 
>
> Key: FLINK-7474
> URL: https://issues.apache.org/jira/browse/FLINK-7474
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Affects Versions: 2.0.0, 1.4.0
> Environment: All platforms 
>Reporter: Joe
>  Labels: features
> Fix For: 2.0.0, 1.4.0
>
>   Original Estimate: 336h
>  Remaining Estimate: 336h
>
> Flink-azureeventhubs-connector:
> * Consuming data from eventhubs
> * Eventhubs offset snapshot and restore 
> * RecordWithTimestampAndPeriodicWatermark and 
> RecordWithTimestampAndPunctuatedWatermark supported 
> * Sink data to eventhubs
> * Perf counters: receivedCount, prepareSendCount, commitSendCount



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-7474) Flink supports consuming data from and sink data to Azure eventhubs

2017-08-17 Thread Joe (JIRA)
Joe created FLINK-7474:
--

 Summary: Flink supports consuming data from and sink data to Azure 
eventhubs 
 Key: FLINK-7474
 URL: https://issues.apache.org/jira/browse/FLINK-7474
 Project: Flink
  Issue Type: New Feature
  Components: Streaming Connectors
Affects Versions: 2.0.0, 1.4.0
 Environment: All platforms 
Reporter: Joe
 Fix For: 2.0.0, 1.4.0


Flink-azureeventhubs-connector:

* Consuming data from eventhubs
* Eventhubs offset snapshot and restore 
* RecordWithTimestampAndPeriodicWatermark and 
RecordWithTimestampAndPunctuatedWatermark supported 
* Sink data to eventhubs
* Perf counters: receivedCount, prepareSendCount, commitSendCount




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-5486) Lack of synchronization in BucketingSink#handleRestoredBucketState()

2017-08-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5486?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16131631#comment-16131631
 ] 

ASF GitHub Bot commented on FLINK-5486:
---

Github user zhangminglei commented on the issue:

https://github.com/apache/flink/pull/4356
  
Okay. Ted. I will trigger the QA soon.


> Lack of synchronization in BucketingSink#handleRestoredBucketState()
> 
>
> Key: FLINK-5486
> URL: https://issues.apache.org/jira/browse/FLINK-5486
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming Connectors
>Reporter: Ted Yu
>Assignee: mingleizhang
> Fix For: 1.3.3
>
>
> Here is related code:
> {code}
>   
> handlePendingFilesForPreviousCheckpoints(bucketState.pendingFilesPerCheckpoint);
>   synchronized (bucketState.pendingFilesPerCheckpoint) {
> bucketState.pendingFilesPerCheckpoint.clear();
>   }
> {code}
> The handlePendingFilesForPreviousCheckpoints() call should be enclosed inside 
> the synchronization block. Otherwise during the processing of 
> handlePendingFilesForPreviousCheckpoints(), some entries of the map may be 
> cleared.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4356: [FLINK-5486] Fix lacking of synchronization in BucketingS...

2017-08-17 Thread zhangminglei
Github user zhangminglei commented on the issue:

https://github.com/apache/flink/pull/4356
  
Okay. Ted. I will trigger the QA soon.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Assigned] (FLINK-7446) Support to define an existing field as the rowtime field for TableSource

2017-08-17 Thread Jark Wu (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7446?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jark Wu reassigned FLINK-7446:
--

Assignee: Jark Wu

> Support to define an existing field as the rowtime field for TableSource
> 
>
> Key: FLINK-7446
> URL: https://issues.apache.org/jira/browse/FLINK-7446
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Jark Wu
>Assignee: Jark Wu
>
> Currently {{DefinedRowtimeAttribute}} can define an appended rowtime field 
> for a {{TableSource}}. But it would be helpful if we can support to define an 
> existing field as the rowtime field. Just like registering a DataStream, the 
> rowtime field can be appended but also can replace an existing field.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7446) Support to define an existing field as the rowtime field for TableSource

2017-08-17 Thread Xingcan Cui (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7446?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16131629#comment-16131629
 ] 

Xingcan Cui commented on FLINK-7446:


Now that the rowtime field is an existing one in the stream, shall we consider 
extracting the timestamps automatically instead of forcing the users to assign 
them with the {{TimestampAssigner}} in advance. What if the timestamps assigned 
in the {{StreamRecord}} and in the {{Row}} do not match.

> Support to define an existing field as the rowtime field for TableSource
> 
>
> Key: FLINK-7446
> URL: https://issues.apache.org/jira/browse/FLINK-7446
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Jark Wu
>Assignee: Jark Wu
>
> Currently {{DefinedRowtimeAttribute}} can define an appended rowtime field 
> for a {{TableSource}}. But it would be helpful if we can support to define an 
> existing field as the rowtime field. Just like registering a DataStream, the 
> rowtime field can be appended but also can replace an existing field.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-5486) Lack of synchronization in BucketingSink#handleRestoredBucketState()

2017-08-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5486?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16131628#comment-16131628
 ] 

ASF GitHub Bot commented on FLINK-5486:
---

Github user tedyu commented on the issue:

https://github.com/apache/flink/pull/4356
  
Run QA again ?


> Lack of synchronization in BucketingSink#handleRestoredBucketState()
> 
>
> Key: FLINK-5486
> URL: https://issues.apache.org/jira/browse/FLINK-5486
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming Connectors
>Reporter: Ted Yu
>Assignee: mingleizhang
> Fix For: 1.3.3
>
>
> Here is related code:
> {code}
>   
> handlePendingFilesForPreviousCheckpoints(bucketState.pendingFilesPerCheckpoint);
>   synchronized (bucketState.pendingFilesPerCheckpoint) {
> bucketState.pendingFilesPerCheckpoint.clear();
>   }
> {code}
> The handlePendingFilesForPreviousCheckpoints() call should be enclosed inside 
> the synchronization block. Otherwise during the processing of 
> handlePendingFilesForPreviousCheckpoints(), some entries of the map may be 
> cleared.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-4534) Lack of synchronization in BucketingSink#restoreState()

2017-08-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4534?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16131626#comment-16131626
 ] 

ASF GitHub Bot commented on FLINK-4534:
---

Github user tedyu commented on the issue:

https://github.com/apache/flink/pull/4482
  
lgtm


> Lack of synchronization in BucketingSink#restoreState()
> ---
>
> Key: FLINK-4534
> URL: https://issues.apache.org/jira/browse/FLINK-4534
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming Connectors
>Reporter: Ted Yu
>Assignee: mingleizhang
>
> Iteration over state.bucketStates is protected by synchronization in other 
> methods, except for the following in restoreState():
> {code}
> for (BucketState bucketState : state.bucketStates.values()) {
> {code}
> and following in close():
> {code}
> for (Map.Entry entry : 
> state.bucketStates.entrySet()) {
>   closeCurrentPartFile(entry.getValue());
> {code}
> w.r.t. bucketState.pendingFilesPerCheckpoint , there is similar issue 
> starting line 752:
> {code}
>   Set pastCheckpointIds = 
> bucketState.pendingFilesPerCheckpoint.keySet();
>   LOG.debug("Moving pending files to final location on restore.");
>   for (Long pastCheckpointId : pastCheckpointIds) {
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4356: [FLINK-5486] Fix lacking of synchronization in BucketingS...

2017-08-17 Thread tedyu
Github user tedyu commented on the issue:

https://github.com/apache/flink/pull/4356
  
Run QA again ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #4482: [FLINK-4534] Fix synchronization issue in BucketingSink

2017-08-17 Thread tedyu
Github user tedyu commented on the issue:

https://github.com/apache/flink/pull/4482
  
lgtm


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-7062) Support the basic functionality of MATCH_RECOGNIZE

2017-08-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7062?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16131607#comment-16131607
 ] 

ASF GitHub Bot commented on FLINK-7062:
---

Github user dianfu commented on the issue:

https://github.com/apache/flink/pull/4502
  
@fhueske @kl0u @dawidwys @wuchong Any feedback?


> Support the basic functionality of MATCH_RECOGNIZE
> --
>
> Key: FLINK-7062
> URL: https://issues.apache.org/jira/browse/FLINK-7062
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP, Table API & SQL
>Reporter: Dian Fu
>Assignee: Dian Fu
>
> In this JIRA, we will support the basic functionality of {{MATCH_RECOGNIZE}} 
> in Flink SQL API which includes the support of syntax {{MEASURES}}, 
> {{PATTERN}} and {{DEFINE}}. This would allow users write basic cep use cases 
> with SQL like the following example:
> {code}
> SELECT T.aid, T.bid, T.cid
> FROM MyTable
> MATCH_RECOGNIZE (
>   MEASURES
> A.id AS aid,
> B.id AS bid,
> C.id AS cid
>   PATTERN (A B C)
>   DEFINE
> A AS A.name = 'a',
> B AS B.name = 'b',
> C AS C.name = 'c'
> ) AS T
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7446) Support to define an existing field as the rowtime field for TableSource

2017-08-17 Thread Jark Wu (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7446?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16131616#comment-16131616
 ] 

Jark Wu commented on FLINK-7446:


[~fhueske] Great! I will make a pull request in the next days.

> Support to define an existing field as the rowtime field for TableSource
> 
>
> Key: FLINK-7446
> URL: https://issues.apache.org/jira/browse/FLINK-7446
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Jark Wu
>Assignee: Jark Wu
>
> Currently {{DefinedRowtimeAttribute}} can define an appended rowtime field 
> for a {{TableSource}}. But it would be helpful if we can support to define an 
> existing field as the rowtime field. Just like registering a DataStream, the 
> rowtime field can be appended but also can replace an existing field.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4502: [FLINK-7062] [table, cep] Support the basic functionality...

2017-08-17 Thread dianfu
Github user dianfu commented on the issue:

https://github.com/apache/flink/pull/4502
  
@fhueske @kl0u @dawidwys @wuchong Any feedback?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4560: Flink 7077

2017-08-17 Thread EronWright
GitHub user EronWright opened a pull request:

https://github.com/apache/flink/pull/4560

Flink 7077

## Description

_This PR extends #4555 - please disregard commits from Aug 16, 2017._ 

[FLINK-7077] [mesos] Implement task release to support dynamic scaling

- SlotManager: fix for idleness tracking (`markIdle` shouldn't reset 
`idleSince` on every call)
- ResourceManager: change `stopWorker` method to use `ResourceID`
- ResourceManager: schedule callbacks from `ResourceManagerActions` 
onto main thread
- MesosResourceManager: implement `stopWorker`
- MesosResourceManager: fix for message routing from child actors to RM

This change added tests and can be verified as follows:
- `MesosResourceManagerTest::testStopWorker`

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): **no**
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: **no**
  - The serializers: **no**
  - The runtime per-record code paths (performance sensitive): **no**
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: **no**

## Documentation

  - Does this pull request introduce a new feature? **no**


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/EronWright/flink FLINK-7077

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4560.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4560


commit 7d257db84f4b9bf2a02d1375a04ff64516266186
Author: Wright, Eron 
Date:   2017-08-16T21:30:24Z

[FLINK-6630] Implement FLIP-6 MesosAppMasterRunner
[FLINK-6631] Implement FLIP-6 MesosTaskExecutorRunner

- bin: new entrypoints scripts for flip-6
- ClusterEntrypoint: Refactor the shutdown method
- ClusterEntrypoint: Install default FileSystem (for parity with legacy 
entrypoints)
- ClusterEntrypoint: new MesosJobClusterEntrypoint, 
MesosSessionClusterEntrypoint, MesosEntrypointUtils, MesosTaskExecutorRunner
- MesosServices: enhanced with artifactServer, localActorSystem
- MesosResourceManager: Fallback to old TM params when UNKNOWN resource 
profile is provided
- MesosResourceManager: config setting for taskmanager startup script 
(mesos.resourcemanager.tasks.taskmanager-cmd)
- test: added a 'noop' job graph for testing purposes

commit 4cbcde3095774f4ea6484a4cfd07df613fe08d30
Author: Wright, Eron 
Date:   2017-08-16T22:40:47Z

[FLINK-6630] [Mesos] Implement FLIP-6 MesosAppMasterRunner

- removed `streaming-noop-3.graph` since it can be generated using 
`StreamingNoop` program.

commit 691d04efb0d1a44db8b0ef11a504355e6e3d49aa
Author: Wright, Eron 
Date:   2017-08-18T01:22:55Z

[FLINK-7077] [mesos] Implement task release to support dynamic scaling

- SlotManager: fix for idleness tracking (`markIdle` shouldn't reset 
`idleSince` on every call)
- ResourceManager: change `stopWorker` method to use `ResourceID`
- ResourceManager: schedule callbacks from `ResourceManagerActions` onto 
main thread
- MesosResourceManager: implement `stopWorker`
- MesosResourceManager: fix for message routing from child actors to RM




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-7077) Implement task release to support dynamic scaling

2017-08-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7077?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16131597#comment-16131597
 ] 

ASF GitHub Bot commented on FLINK-7077:
---

GitHub user EronWright opened a pull request:

https://github.com/apache/flink/pull/4560

Flink 7077

## Description

_This PR extends #4555 - please disregard commits from Aug 16, 2017._ 

[FLINK-7077] [mesos] Implement task release to support dynamic scaling

- SlotManager: fix for idleness tracking (`markIdle` shouldn't reset 
`idleSince` on every call)
- ResourceManager: change `stopWorker` method to use `ResourceID`
- ResourceManager: schedule callbacks from `ResourceManagerActions` 
onto main thread
- MesosResourceManager: implement `stopWorker`
- MesosResourceManager: fix for message routing from child actors to RM

This change added tests and can be verified as follows:
- `MesosResourceManagerTest::testStopWorker`

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): **no**
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: **no**
  - The serializers: **no**
  - The runtime per-record code paths (performance sensitive): **no**
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: **no**

## Documentation

  - Does this pull request introduce a new feature? **no**


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/EronWright/flink FLINK-7077

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4560.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4560


commit 7d257db84f4b9bf2a02d1375a04ff64516266186
Author: Wright, Eron 
Date:   2017-08-16T21:30:24Z

[FLINK-6630] Implement FLIP-6 MesosAppMasterRunner
[FLINK-6631] Implement FLIP-6 MesosTaskExecutorRunner

- bin: new entrypoints scripts for flip-6
- ClusterEntrypoint: Refactor the shutdown method
- ClusterEntrypoint: Install default FileSystem (for parity with legacy 
entrypoints)
- ClusterEntrypoint: new MesosJobClusterEntrypoint, 
MesosSessionClusterEntrypoint, MesosEntrypointUtils, MesosTaskExecutorRunner
- MesosServices: enhanced with artifactServer, localActorSystem
- MesosResourceManager: Fallback to old TM params when UNKNOWN resource 
profile is provided
- MesosResourceManager: config setting for taskmanager startup script 
(mesos.resourcemanager.tasks.taskmanager-cmd)
- test: added a 'noop' job graph for testing purposes

commit 4cbcde3095774f4ea6484a4cfd07df613fe08d30
Author: Wright, Eron 
Date:   2017-08-16T22:40:47Z

[FLINK-6630] [Mesos] Implement FLIP-6 MesosAppMasterRunner

- removed `streaming-noop-3.graph` since it can be generated using 
`StreamingNoop` program.

commit 691d04efb0d1a44db8b0ef11a504355e6e3d49aa
Author: Wright, Eron 
Date:   2017-08-18T01:22:55Z

[FLINK-7077] [mesos] Implement task release to support dynamic scaling

- SlotManager: fix for idleness tracking (`markIdle` shouldn't reset 
`idleSince` on every call)
- ResourceManager: change `stopWorker` method to use `ResourceID`
- ResourceManager: schedule callbacks from `ResourceManagerActions` onto 
main thread
- MesosResourceManager: implement `stopWorker`
- MesosResourceManager: fix for message routing from child actors to RM




> Implement task release to support dynamic scaling
> -
>
> Key: FLINK-7077
> URL: https://issues.apache.org/jira/browse/FLINK-7077
> Project: Flink
>  Issue Type: Sub-task
>  Components: Mesos, ResourceManager
>Reporter: Till Rohrmann
>
> In order to support dynamic scaling, the {{MesosResourceManager}} has to be 
> able to release Mesos tasks. We have to implement 
> {{MesosResourceManager#stopWorker}} for that.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7245) Enhance the operators to support holding back watermarks

2017-08-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7245?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16131587#comment-16131587
 ] 

ASF GitHub Bot commented on FLINK-7245:
---

Github user xccui commented on the issue:

https://github.com/apache/flink/pull/4530
  
Thanks for the comments @fhueske. I will pay more attention to the coding 
style. 

Actually, there are many ways to implement this feature. At first, I planed 
to override the `processWatermark` method in the sub-class. However, the 
instance variable `timeServiceManager` needed is declared as private. I'm not 
sure if this can be changed. 


> Enhance the operators to support holding back watermarks
> 
>
> Key: FLINK-7245
> URL: https://issues.apache.org/jira/browse/FLINK-7245
> Project: Flink
>  Issue Type: New Feature
>  Components: DataStream API
>Reporter: Xingcan Cui
>Assignee: Xingcan Cui
>
> Currently the watermarks are applied and emitted by the 
> {{AbstractStreamOperator}} instantly. 
> {code:java}
> public void processWatermark(Watermark mark) throws Exception {
>   if (timeServiceManager != null) {
>   timeServiceManager.advanceWatermark(mark);
>   }
>   output.emitWatermark(mark);
> }
> {code}
> Some calculation results (with timestamp fields) triggered by these 
> watermarks (e.g., join or aggregate results) may be regarded as delayed by 
> the downstream operators since their timestamps must be less than or equal to 
> the corresponding triggers. 
> This issue aims to add another "working mode", which supports holding back 
> watermarks, to current operators. These watermarks should be blocked and 
> stored by the operators until all the corresponding new generated results are 
> emitted.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4530: [FLINK-7245] [stream] Support holding back watermarks wit...

2017-08-17 Thread xccui
Github user xccui commented on the issue:

https://github.com/apache/flink/pull/4530
  
Thanks for the comments @fhueske. I will pay more attention to the coding 
style. 

Actually, there are many ways to implement this feature. At first, I planed 
to override the `processWatermark` method in the sub-class. However, the 
instance variable `timeServiceManager` needed is declared as private. I'm not 
sure if this can be changed. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Updated] (FLINK-7473) Possible Leak in GlobalWindows

2017-08-17 Thread Steve Jerman (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7473?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Steve Jerman updated FLINK-7473:

Attachment: timerIssue.zip

Sample project showing the issue.

Look for org.apache.flink.streaming.api.operators.InternalTimer in heap and you 
will see it continually grows.

Steve

> Possible Leak in GlobalWindows
> --
>
> Key: FLINK-7473
> URL: https://issues.apache.org/jira/browse/FLINK-7473
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Affects Versions: 1.3.2
> Environment: See attached project
>Reporter: Steve Jerman
> Attachments: timerIssue.zip
>
>
> Hi,
> I have been wrestling with a issue with GlobalWindows. It seems like it leaks 
> instances of InternalTimer.
> I can't tell if it's a bug or my code so I created a 'minimal' project that 
> has the issue...
> If you run the Unit Test  in the attached and then monitor heap you will see 
> that the number of InternalTimers continually increases. I added code to 
> explicitly delete them.. doesn't seem to help.
> If I comment out registerEventTimeTimer ... no leak :)
> My suspicion is that PURGE/FIRE_AND_PURGE is leaving the timer in limbo.
> Steve



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-7473) Possible Leak in GlobalWindows

2017-08-17 Thread Steve Jerman (JIRA)
Steve Jerman created FLINK-7473:
---

 Summary: Possible Leak in GlobalWindows
 Key: FLINK-7473
 URL: https://issues.apache.org/jira/browse/FLINK-7473
 Project: Flink
  Issue Type: Bug
  Components: DataStream API
Affects Versions: 1.3.2
 Environment: See attached project
Reporter: Steve Jerman


Hi,

I have been wrestling with a issue with GlobalWindows. It seems like it leaks 
instances of InternalTimer.

I can't tell if it's a bug or my code so I created a 'minimal' project that has 
the issue...

If you run the Unit Test  in the attached and then monitor heap you will see 
that the number of InternalTimers continually increases. I added code to 
explicitly delete them.. doesn't seem to help.

If I comment out registerEventTimeTimer ... no leak :)

My suspicion is that PURGE/FIRE_AND_PURGE is leaving the timer in limbo.

Steve



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-7472) Release task managers gracefully

2017-08-17 Thread Eron Wright (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7472?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Eron Wright  updated FLINK-7472:

Description: 
When a task manager is no longer needed (e.g. due to idle timeout in slot 
manager), the RM should gracefully stop it without spurious warnings.   This 
implies some actions should be taken before the TM is actually killed.   
Proactive steps include stopping the heartbeat monitor and sending a disconnect 
message.   

It is unclear whether `RM::closeTaskManagerConnection` method should be called 
proactively (when we plan to kill a TM), reactively (after the TM is killed), 
or both.  

  was:
When a task manager is no longer needed (e.g. due to idle timeout in slot 
manager), the RM should gracefully stop it without spurious warnings.   While 
implies some actions should be taken before the TM is actually killed.   
Proactive steps include stopping the heartbeat monitor and sending a disconnect 
message.   

It is unclear whether `RM::closeTaskManagerConnection` method should be called 
proactively (when we plan to kill a TM), reactively (after the TM is killed), 
or both.  


> Release task managers gracefully
> 
>
> Key: FLINK-7472
> URL: https://issues.apache.org/jira/browse/FLINK-7472
> Project: Flink
>  Issue Type: Sub-task
>  Components: Cluster Management
>Reporter: Eron Wright 
>
> When a task manager is no longer needed (e.g. due to idle timeout in slot 
> manager), the RM should gracefully stop it without spurious warnings.   This 
> implies some actions should be taken before the TM is actually killed.   
> Proactive steps include stopping the heartbeat monitor and sending a 
> disconnect message.   
> It is unclear whether `RM::closeTaskManagerConnection` method should be 
> called proactively (when we plan to kill a TM), reactively (after the TM is 
> killed), or both.  



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7245) Enhance the operators to support holding back watermarks

2017-08-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7245?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16131401#comment-16131401
 ] 

ASF GitHub Bot commented on FLINK-7245:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4530#discussion_r133842744
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/operators/KeyedCoProcessOperatorWithWatermarkDelay.scala
 ---
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.operators
+
+import org.apache.flink.streaming.api.functions.co.CoProcessFunction
+import org.apache.flink.streaming.api.operators.co.KeyedCoProcessOperator
+import org.apache.flink.streaming.api.watermark.Watermark
+
+/**
+  * A {@link 
org.apache.flink.streaming.api.operators.co.KeyedCoProcessOperator} that 
supports
+  * holding back watermarks with a static delays.
+  */
+class KeyedCoProcessOperatorWithWatermarkDelay[KEY, IN1, IN2, OUT](
+  private val flatMapper: CoProcessFunction[IN1, IN2, OUT],
+  private val watermarkDelay1: Long = 0L,
+  // The watermarkDelay2 is useless now
+  private var watermarkDelay2: Long = 0L)
--- End diff --

Since this is an internal class, we don't need to be concerned about 
changing the interface later. I'd suggest to remove `watermarkDelay2` and add 
it later when we need it.


> Enhance the operators to support holding back watermarks
> 
>
> Key: FLINK-7245
> URL: https://issues.apache.org/jira/browse/FLINK-7245
> Project: Flink
>  Issue Type: New Feature
>  Components: DataStream API
>Reporter: Xingcan Cui
>Assignee: Xingcan Cui
>
> Currently the watermarks are applied and emitted by the 
> {{AbstractStreamOperator}} instantly. 
> {code:java}
> public void processWatermark(Watermark mark) throws Exception {
>   if (timeServiceManager != null) {
>   timeServiceManager.advanceWatermark(mark);
>   }
>   output.emitWatermark(mark);
> }
> {code}
> Some calculation results (with timestamp fields) triggered by these 
> watermarks (e.g., join or aggregate results) may be regarded as delayed by 
> the downstream operators since their timestamps must be less than or equal to 
> the corresponding triggers. 
> This issue aims to add another "working mode", which supports holding back 
> watermarks, to current operators. These watermarks should be blocked and 
> stored by the operators until all the corresponding new generated results are 
> emitted.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7245) Enhance the operators to support holding back watermarks

2017-08-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7245?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16131403#comment-16131403
 ] 

ASF GitHub Bot commented on FLINK-7245:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4530#discussion_r133843436
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/operators/KeyedCoProcessOperatorWithWatermarkDelayTest.scala
 ---
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.operators
+
+import java.util.concurrent.ConcurrentLinkedQueue
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo
+import org.apache.flink.api.java.functions.KeySelector
+import org.apache.flink.streaming.api.functions.co.CoProcessFunction
+import org.apache.flink.streaming.api.watermark.Watermark
+import 
org.apache.flink.streaming.util.{KeyedTwoInputStreamOperatorTestHarness, 
TestHarnessUtil}
+import org.apache.flink.util.{Collector, TestLogger}
+
+import org.junit.{Assert, Test}
+
+/**
+  * Tests {@link KeyedProcessOperatorWithWatermarkDelay}.
+  */
+class KeyedCoProcessOperatorWithWatermarkDelayTest extends TestLogger {
+
+  @Test
+  def testHoldingBackWatermarks(): Unit = {
+val operator = new KeyedCoProcessOperatorWithWatermarkDelay[String, 
Integer, String, String](
+  new EmptyCoProcessFunction, 100)
+val testHarness = new KeyedTwoInputStreamOperatorTestHarness[String, 
Integer, String, String](
+  operator, new IntToStringKeySelector, new 
CoIdentityKeySelector[String],
+  BasicTypeInfo.STRING_TYPE_INFO)
+testHarness.setup()
+testHarness.open()
+testHarness.processWatermark1(new Watermark(101))
+testHarness.processWatermark2(new Watermark(202))
+testHarness.processWatermark1(new Watermark(103))
+testHarness.processWatermark2(new Watermark(204))
+val expectedOutput = new ConcurrentLinkedQueue[AnyRef]
+expectedOutput.add(new Watermark(1))
+expectedOutput.add(new Watermark(3))
+TestHarnessUtil.assertOutputEquals("Output was not correct.", 
expectedOutput,
--- End diff --

Please format parameter lists that do not fit into one line as follows:

```
TestHarnessUtil.assertOutputEquals(
  "Output was not correct.", 
  expectedOutput, 
  testHarness.getOutput)
```


> Enhance the operators to support holding back watermarks
> 
>
> Key: FLINK-7245
> URL: https://issues.apache.org/jira/browse/FLINK-7245
> Project: Flink
>  Issue Type: New Feature
>  Components: DataStream API
>Reporter: Xingcan Cui
>Assignee: Xingcan Cui
>
> Currently the watermarks are applied and emitted by the 
> {{AbstractStreamOperator}} instantly. 
> {code:java}
> public void processWatermark(Watermark mark) throws Exception {
>   if (timeServiceManager != null) {
>   timeServiceManager.advanceWatermark(mark);
>   }
>   output.emitWatermark(mark);
> }
> {code}
> Some calculation results (with timestamp fields) triggered by these 
> watermarks (e.g., join or aggregate results) may be regarded as delayed by 
> the downstream operators since their timestamps must be less than or equal to 
> the corresponding triggers. 
> This issue aims to add another "working mode", which supports holding back 
> watermarks, to current operators. These watermarks should be blocked and 
> stored by the operators until all the corresponding new generated results are 
> emitted.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7245) Enhance the operators to support holding back watermarks

2017-08-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7245?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16131402#comment-16131402
 ] 

ASF GitHub Bot commented on FLINK-7245:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4530#discussion_r133843667
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/operators/KeyedCoProcessOperatorWithWatermarkDelayTest.scala
 ---
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.operators
+
+import java.util.concurrent.ConcurrentLinkedQueue
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo
+import org.apache.flink.api.java.functions.KeySelector
+import org.apache.flink.streaming.api.functions.co.CoProcessFunction
+import org.apache.flink.streaming.api.watermark.Watermark
+import 
org.apache.flink.streaming.util.{KeyedTwoInputStreamOperatorTestHarness, 
TestHarnessUtil}
+import org.apache.flink.util.{Collector, TestLogger}
+
+import org.junit.{Assert, Test}
+
+/**
+  * Tests {@link KeyedProcessOperatorWithWatermarkDelay}.
+  */
+class KeyedCoProcessOperatorWithWatermarkDelayTest extends TestLogger {
+
+  @Test
+  def testHoldingBackWatermarks(): Unit = {
+val operator = new KeyedCoProcessOperatorWithWatermarkDelay[String, 
Integer, String, String](
+  new EmptyCoProcessFunction, 100)
+val testHarness = new KeyedTwoInputStreamOperatorTestHarness[String, 
Integer, String, String](
+  operator, new IntToStringKeySelector, new 
CoIdentityKeySelector[String],
+  BasicTypeInfo.STRING_TYPE_INFO)
+testHarness.setup()
+testHarness.open()
+testHarness.processWatermark1(new Watermark(101))
+testHarness.processWatermark2(new Watermark(202))
+testHarness.processWatermark1(new Watermark(103))
+testHarness.processWatermark2(new Watermark(204))
+val expectedOutput = new ConcurrentLinkedQueue[AnyRef]
+expectedOutput.add(new Watermark(1))
+expectedOutput.add(new Watermark(3))
+TestHarnessUtil.assertOutputEquals("Output was not correct.", 
expectedOutput,
+  testHarness.getOutput)
+testHarness.close()
+  }
+
+  @Test
--- End diff --

use `@Test(expected = classOf[IllegalArgumentException])` instead of 
try-catch with `isInstanceOf`


> Enhance the operators to support holding back watermarks
> 
>
> Key: FLINK-7245
> URL: https://issues.apache.org/jira/browse/FLINK-7245
> Project: Flink
>  Issue Type: New Feature
>  Components: DataStream API
>Reporter: Xingcan Cui
>Assignee: Xingcan Cui
>
> Currently the watermarks are applied and emitted by the 
> {{AbstractStreamOperator}} instantly. 
> {code:java}
> public void processWatermark(Watermark mark) throws Exception {
>   if (timeServiceManager != null) {
>   timeServiceManager.advanceWatermark(mark);
>   }
>   output.emitWatermark(mark);
> }
> {code}
> Some calculation results (with timestamp fields) triggered by these 
> watermarks (e.g., join or aggregate results) may be regarded as delayed by 
> the downstream operators since their timestamps must be less than or equal to 
> the corresponding triggers. 
> This issue aims to add another "working mode", which supports holding back 
> watermarks, to current operators. These watermarks should be blocked and 
> stored by the operators until all the corresponding new generated results are 
> emitted.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7245) Enhance the operators to support holding back watermarks

2017-08-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7245?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16131400#comment-16131400
 ] 

ASF GitHub Bot commented on FLINK-7245:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4530#discussion_r133843854
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/operators/KeyedProcessOperatorWithWatermarkDelayTest.scala
 ---
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators
+
+import java.util.concurrent.ConcurrentLinkedQueue
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo
+import org.apache.flink.api.java.functions.KeySelector
+import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.flink.streaming.api.watermark.Watermark
+import 
org.apache.flink.streaming.util.{KeyedOneInputStreamOperatorTestHarness, 
TestHarnessUtil}
+import org.apache.flink.util.{Collector, TestLogger}
+
+import org.junit.{Assert, Test}
+
+/**
+  * Tests {@link KeyedProcessOperatorWithWatermarkDelay}.
+  */
+class KeyedProcessOperatorWithWatermarkDelayTest extends TestLogger {
--- End diff --

comments of the other test apply here as well.


> Enhance the operators to support holding back watermarks
> 
>
> Key: FLINK-7245
> URL: https://issues.apache.org/jira/browse/FLINK-7245
> Project: Flink
>  Issue Type: New Feature
>  Components: DataStream API
>Reporter: Xingcan Cui
>Assignee: Xingcan Cui
>
> Currently the watermarks are applied and emitted by the 
> {{AbstractStreamOperator}} instantly. 
> {code:java}
> public void processWatermark(Watermark mark) throws Exception {
>   if (timeServiceManager != null) {
>   timeServiceManager.advanceWatermark(mark);
>   }
>   output.emitWatermark(mark);
> }
> {code}
> Some calculation results (with timestamp fields) triggered by these 
> watermarks (e.g., join or aggregate results) may be regarded as delayed by 
> the downstream operators since their timestamps must be less than or equal to 
> the corresponding triggers. 
> This issue aims to add another "working mode", which supports holding back 
> watermarks, to current operators. These watermarks should be blocked and 
> stored by the operators until all the corresponding new generated results are 
> emitted.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4530: [FLINK-7245] [stream] Support holding back waterma...

2017-08-17 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4530#discussion_r133843667
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/operators/KeyedCoProcessOperatorWithWatermarkDelayTest.scala
 ---
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.operators
+
+import java.util.concurrent.ConcurrentLinkedQueue
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo
+import org.apache.flink.api.java.functions.KeySelector
+import org.apache.flink.streaming.api.functions.co.CoProcessFunction
+import org.apache.flink.streaming.api.watermark.Watermark
+import 
org.apache.flink.streaming.util.{KeyedTwoInputStreamOperatorTestHarness, 
TestHarnessUtil}
+import org.apache.flink.util.{Collector, TestLogger}
+
+import org.junit.{Assert, Test}
+
+/**
+  * Tests {@link KeyedProcessOperatorWithWatermarkDelay}.
+  */
+class KeyedCoProcessOperatorWithWatermarkDelayTest extends TestLogger {
+
+  @Test
+  def testHoldingBackWatermarks(): Unit = {
+val operator = new KeyedCoProcessOperatorWithWatermarkDelay[String, 
Integer, String, String](
+  new EmptyCoProcessFunction, 100)
+val testHarness = new KeyedTwoInputStreamOperatorTestHarness[String, 
Integer, String, String](
+  operator, new IntToStringKeySelector, new 
CoIdentityKeySelector[String],
+  BasicTypeInfo.STRING_TYPE_INFO)
+testHarness.setup()
+testHarness.open()
+testHarness.processWatermark1(new Watermark(101))
+testHarness.processWatermark2(new Watermark(202))
+testHarness.processWatermark1(new Watermark(103))
+testHarness.processWatermark2(new Watermark(204))
+val expectedOutput = new ConcurrentLinkedQueue[AnyRef]
+expectedOutput.add(new Watermark(1))
+expectedOutput.add(new Watermark(3))
+TestHarnessUtil.assertOutputEquals("Output was not correct.", 
expectedOutput,
+  testHarness.getOutput)
+testHarness.close()
+  }
+
+  @Test
--- End diff --

use `@Test(expected = classOf[IllegalArgumentException])` instead of 
try-catch with `isInstanceOf`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4530: [FLINK-7245] [stream] Support holding back waterma...

2017-08-17 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4530#discussion_r133843854
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/operators/KeyedProcessOperatorWithWatermarkDelayTest.scala
 ---
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators
+
+import java.util.concurrent.ConcurrentLinkedQueue
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo
+import org.apache.flink.api.java.functions.KeySelector
+import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.flink.streaming.api.watermark.Watermark
+import 
org.apache.flink.streaming.util.{KeyedOneInputStreamOperatorTestHarness, 
TestHarnessUtil}
+import org.apache.flink.util.{Collector, TestLogger}
+
+import org.junit.{Assert, Test}
+
+/**
+  * Tests {@link KeyedProcessOperatorWithWatermarkDelay}.
+  */
+class KeyedProcessOperatorWithWatermarkDelayTest extends TestLogger {
--- End diff --

comments of the other test apply here as well.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4530: [FLINK-7245] [stream] Support holding back waterma...

2017-08-17 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4530#discussion_r133843436
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/operators/KeyedCoProcessOperatorWithWatermarkDelayTest.scala
 ---
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.operators
+
+import java.util.concurrent.ConcurrentLinkedQueue
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo
+import org.apache.flink.api.java.functions.KeySelector
+import org.apache.flink.streaming.api.functions.co.CoProcessFunction
+import org.apache.flink.streaming.api.watermark.Watermark
+import 
org.apache.flink.streaming.util.{KeyedTwoInputStreamOperatorTestHarness, 
TestHarnessUtil}
+import org.apache.flink.util.{Collector, TestLogger}
+
+import org.junit.{Assert, Test}
+
+/**
+  * Tests {@link KeyedProcessOperatorWithWatermarkDelay}.
+  */
+class KeyedCoProcessOperatorWithWatermarkDelayTest extends TestLogger {
+
+  @Test
+  def testHoldingBackWatermarks(): Unit = {
+val operator = new KeyedCoProcessOperatorWithWatermarkDelay[String, 
Integer, String, String](
+  new EmptyCoProcessFunction, 100)
+val testHarness = new KeyedTwoInputStreamOperatorTestHarness[String, 
Integer, String, String](
+  operator, new IntToStringKeySelector, new 
CoIdentityKeySelector[String],
+  BasicTypeInfo.STRING_TYPE_INFO)
+testHarness.setup()
+testHarness.open()
+testHarness.processWatermark1(new Watermark(101))
+testHarness.processWatermark2(new Watermark(202))
+testHarness.processWatermark1(new Watermark(103))
+testHarness.processWatermark2(new Watermark(204))
+val expectedOutput = new ConcurrentLinkedQueue[AnyRef]
+expectedOutput.add(new Watermark(1))
+expectedOutput.add(new Watermark(3))
+TestHarnessUtil.assertOutputEquals("Output was not correct.", 
expectedOutput,
--- End diff --

Please format parameter lists that do not fit into one line as follows:

```
TestHarnessUtil.assertOutputEquals(
  "Output was not correct.", 
  expectedOutput, 
  testHarness.getOutput)
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4530: [FLINK-7245] [stream] Support holding back waterma...

2017-08-17 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4530#discussion_r133842744
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/operators/KeyedCoProcessOperatorWithWatermarkDelay.scala
 ---
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.operators
+
+import org.apache.flink.streaming.api.functions.co.CoProcessFunction
+import org.apache.flink.streaming.api.operators.co.KeyedCoProcessOperator
+import org.apache.flink.streaming.api.watermark.Watermark
+
+/**
+  * A {@link 
org.apache.flink.streaming.api.operators.co.KeyedCoProcessOperator} that 
supports
+  * holding back watermarks with a static delays.
+  */
+class KeyedCoProcessOperatorWithWatermarkDelay[KEY, IN1, IN2, OUT](
+  private val flatMapper: CoProcessFunction[IN1, IN2, OUT],
+  private val watermarkDelay1: Long = 0L,
+  // The watermarkDelay2 is useless now
+  private var watermarkDelay2: Long = 0L)
--- End diff --

Since this is an internal class, we don't need to be concerned about 
changing the interface later. I'd suggest to remove `watermarkDelay2` and add 
it later when we need it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6233) Support rowtime inner equi-join between two streams in the SQL API

2017-08-17 Thread Fabian Hueske (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6233?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16131364#comment-16131364
 ] 

Fabian Hueske commented on FLINK-6233:
--

Hi [~xccui], sorry for the late reply.

You are right that the clean-up of a window needs to be triggered by the 
watermark of the other input. Basically, the watermark tells the operator that 
no more record from that input are expected that can join with some records of 
the other input. Those records can be discarded from the state.
A benefit of handling the watermarks of both inputs separately would be to 
immediately and completely join records from the slower input with the state of 
the faster input and not putting them into state to wait for other records from 
the faster input because we would know that those have all already been 
received due to the higher watermark.

Regarding your question
{quote}Did you mean that even for the rowtime join, the clean up timers should 
also use the ctx.registerProcessingTimeTimer() instead of 
ctx.registerEventTimeTimer()? I noticed that there's another issue (FLINK-7388) 
about the onTimer() method, but not sure if it's relative.{quote}

Yes, also event-time operators should implement the state retention time policy 
based on processing time. However, we don't need this for the windowed join 
operator. Windowed operators can (and must) automatically clear their complete 
state as time progresses. The state retention timers were added for operators 
that need to keep state forever to ensure correct semantics. By removing state, 
we give up correct semantics (in some cases) but ensure that the query does not 
leak state and can run for a very long time.

I'm currently on vacation and don't have much time for reviewing and no dev 
machine with me. I'll try to take a look at your code in the next days but 
can't promise.

Best, Fabian

> Support rowtime inner equi-join between two streams in the SQL API
> --
>
> Key: FLINK-6233
> URL: https://issues.apache.org/jira/browse/FLINK-6233
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: hongyuhong
>Assignee: Xingcan Cui
>
> The goal of this issue is to add support for inner equi-join on proc time 
> streams to the SQL interface.
> Queries similar to the following should be supported:
> {code}
> SELECT o.rowtime , o.productId, o.orderId, s.rowtime AS shipTime 
> FROM Orders AS o 
> JOIN Shipments AS s 
> ON o.orderId = s.orderId 
> AND o.rowtime BETWEEN s.rowtime AND s.rowtime + INTERVAL '1' HOUR;
> {code}
> The following restrictions should initially apply:
> * The join hint only support inner join
> * The ON clause should include equi-join condition
> * The time-condition {{o.rowtime BETWEEN s.rowtime AND s.rowtime + INTERVAL 
> '1' HOUR}} only can use rowtime that is a system attribute, the time 
> condition only support bounded time range like {{o.rowtime BETWEEN s.rowtime 
> - INTERVAL '1' HOUR AND s.rowtime + INTERVAL '1' HOUR}}, not support 
> unbounded like {{o.rowtime  s.rowtime}} ,  and  should include both two 
> stream's rowtime attribute, {{o.rowtime between rowtime () and rowtime () + 
> 1}} should also not be supported.
> An row-time streams join will not be able to handle late data, because this 
> would mean in insert a row into a sorted order shift all other computations. 
> This would be too expensive to maintain. Therefore, we will throw an error if 
> a user tries to use an row-time stream join with late data handling.
> This issue includes:
> * Design of the DataStream operator to deal with stream join
> * Translation from Calcite's RelNode representation (LogicalJoin). 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-7472) Release task managers gracefully

2017-08-17 Thread Eron Wright (JIRA)
Eron Wright  created FLINK-7472:
---

 Summary: Release task managers gracefully
 Key: FLINK-7472
 URL: https://issues.apache.org/jira/browse/FLINK-7472
 Project: Flink
  Issue Type: Sub-task
Reporter: Eron Wright 


When a task manager is no longer needed (e.g. due to idle timeout in slot 
manager), the RM should gracefully stop it without spurious warnings.   While 
implies some actions should be taken before the TM is actually killed.   
Proactive steps include stopping the heartbeat monitor and sending a disconnect 
message.   

It is unclear whether `RM::closeTaskManagerConnection` method should be called 
proactively (when we plan to kill a TM), reactively (after the TM is killed), 
or both.  



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7471) Improve bounded OVER support non-retract method AGG

2017-08-17 Thread Fabian Hueske (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7471?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16131336#comment-16131336
 ] 

Fabian Hueske commented on FLINK-7471:
--

Are you proposing to compute all aggregates for each record completely from 
scratch?
How would we separate retracting and non-retracting aggregation functions from 
each other? 
Does this require a major refactoring of the code-generation and aggregation 
infrastructure that we implemented some months ago?

> Improve bounded OVER support non-retract method AGG
> ---
>
> Key: FLINK-7471
> URL: https://issues.apache.org/jira/browse/FLINK-7471
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: sunjincheng
>Assignee: sunjincheng
>
> Currently BOUNDED OVER WINDOW only support have {{retract}} method AGG. In 
> this JIRA. will add non-retract method support.
> What do you think? [~fhueske]



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-7471) Improve bounded OVER support non-retract method AGG

2017-08-17 Thread sunjincheng (JIRA)
sunjincheng created FLINK-7471:
--

 Summary: Improve bounded OVER support non-retract method AGG
 Key: FLINK-7471
 URL: https://issues.apache.org/jira/browse/FLINK-7471
 Project: Flink
  Issue Type: Improvement
  Components: Table API & SQL
Reporter: sunjincheng
Assignee: sunjincheng


Currently BOUNDED OVER WINDOW only support have {{retract}} method AGG. In this 
JIRA. will add non-retract method support.
What do you think? [~fhueske]



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7465) Add build-in BloomFilterCount on TableAPI

2017-08-17 Thread Fabian Hueske (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7465?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16131296#comment-16131296
 ] 

Fabian Hueske commented on FLINK-7465:
--

I think an approximate count distinct would be a very nice feature.
AFAIK, count-min sketches are more commonly used and probably better suited for 
this use case. Similar to bloom filters, they are based on bitmaps and hashing 
and can be tuned for space and accuracy. 

[~sunjincheng121] How would you like to configure the accuracy, i.e., before 
the function is registered or when the function is used in a query?

> Add build-in BloomFilterCount on TableAPI
> -
>
> Key: FLINK-7465
> URL: https://issues.apache.org/jira/browse/FLINK-7465
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: sunjincheng
>Assignee: sunjincheng
> Attachments: bloomfilter.png
>
>
> In this JIRA. use BloomFilter to implement counting functions.
> BloomFilter Algorithm description:
> An empty Bloom filter is a bit array of m bits, all set to 0. There must also 
> be k different hash functions defined, each of which maps or hashes some set 
> element to one of the m array positions, generating a uniform random 
> distribution. Typically, k is a constant, much smaller than m, which is 
> proportional to the number of elements to be added; the precise choice of k 
> and the constant of proportionality of m are determined by the intended false 
> positive rate of the filter.
> To add an element, feed it to each of the k hash functions to get k array 
> positions. Set the bits at all these positions to 1.
> To query for an element (test whether it is in the set), feed it to each of 
> the k hash functions to get k array positions. If any of the bits at these 
> positions is 0, the element is definitely not in the set – if it were, then 
> all the bits would have been set to 1 when it was inserted. If all are 1, 
> then either the element is in the set, or the bits have by chance been set to 
> 1 during the insertion of other elements, resulting in a false positive.
> An example of a Bloom filter, representing the set {x, y, z}. The colored 
> arrows show the positions in the bit array that each set element is mapped 
> to. The element w is not in the set {x, y, z}, because it hashes to one 
> bit-array position containing 0. For this figure, m = 18 and k = 3. The 
> sketch as follows:
> !bloomfilter.png!
> Reference:
> 1. https://en.wikipedia.org/wiki/Bloom_filter
> 2. 
> https://github.com/apache/hive/blob/master/storage-api/src/java/org/apache/hive/common/util/BloomFilter.java
> Hi [~fhueske] [~twalthr] I appreciated if you can give me some advice. :-)



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7447) Hope add more committer information to "Community & Project Info" page.

2017-08-17 Thread Fabian Hueske (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7447?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16131275#comment-16131275
 ] 

Fabian Hueske commented on FLINK-7447:
--

I would be fine with adding time zone information but agree with [~greghogan] 
that we shouldn't add organizations. 
People contribute to Apache projects as individuals and don't represent the 
organization they are working for.

> Hope add more committer information to "Community & Project Info" page.
> ---
>
> Key: FLINK-7447
> URL: https://issues.apache.org/jira/browse/FLINK-7447
> Project: Flink
>  Issue Type: Wish
>  Components: Project Website
>Reporter: Hai Zhou
>
> I wish add the "organization" and "time zone" information to committer 
> introduction, while using the mail instead of Apache ID.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7358) Add implicitly converts support for User-defined function

2017-08-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7358?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16131268#comment-16131268
 ] 

ASF GitHub Bot commented on FLINK-7358:
---

Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/4534
  
Hi @sunjincheng121, can you please update the PR description and fill out 
the PR template? 

The community decided to extend the PR template and to be more strict about 
the PR submission process a few weeks ago.
Thank you.


> Add  implicitly converts support for User-defined function
> --
>
> Key: FLINK-7358
> URL: https://issues.apache.org/jira/browse/FLINK-7358
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: sunjincheng
>Assignee: sunjincheng
>
> Currently if user defined a UDF as follows:
> {code}
> object Func extends ScalarFunction {
>   def eval(a: Int, b: Long): String = {
> ...
>   }
> }
> {code}
> And if the table schema is (a: Int, b: int, c: String), then we can not call 
> the UDF `Func('a, 'b)`. So
> I want add implicitly converts when we call UDF. The implicitly convert rule 
> is:
> BYTE_TYPE_INFO -> SHORT_TYPE_INFO -> INT_TYPE_INFO -> LONG_TYPE_INFO -> 
> FLOAT_TYPE_INFO -> DOUBLE_TYPE_INFO
> *Note:
> In this JIRA. only for TableAPI, And SQL will be fixed in 
> https://issues.apache.org/jira/browse/CALCITE-1908.*
> What do you think? [~fhueske]



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4534: [FLINK-7358][table]Add implicitly converts support for Us...

2017-08-17 Thread fhueske
Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/4534
  
Hi @sunjincheng121, can you please update the PR description and fill out 
the PR template? 

The community decided to extend the PR template and to be more strict about 
the PR submission process a few weeks ago.
Thank you.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-7446) Support to define an existing field as the rowtime field for TableSource

2017-08-17 Thread Fabian Hueske (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7446?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16131261#comment-16131261
 ] 

Fabian Hueske commented on FLINK-7446:
--

I think the existing {{DefinedRowtimeAttribute}} interface can be used for that 
as well. If the field name returned by 
{{DefinedRowtimeAttribute.getRowtimeAttribute()}} is included in the field 
names returned by {{TableSource}} we can use the existing field as time 
indicator attribute.

> Support to define an existing field as the rowtime field for TableSource
> 
>
> Key: FLINK-7446
> URL: https://issues.apache.org/jira/browse/FLINK-7446
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Jark Wu
>
> Currently {{DefinedRowtimeAttribute}} can define an appended rowtime field 
> for a {{TableSource}}. But it would be helpful if we can support to define an 
> existing field as the rowtime field. Just like registering a DataStream, the 
> rowtime field can be appended but also can replace an existing field.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-7470) Acquire RM leadership before registering with Mesos

2017-08-17 Thread Eron Wright (JIRA)
Eron Wright  created FLINK-7470:
---

 Summary: Acquire RM leadership before registering with Mesos
 Key: FLINK-7470
 URL: https://issues.apache.org/jira/browse/FLINK-7470
 Project: Flink
  Issue Type: Sub-task
Reporter: Eron Wright 



Mesos doesn't support fencing tokens in the scheduler protocol; it assumes 
external leader election among scheduler instances.   The last connection wins; 
prior connections for a given framework ID are closed.

The Mesos RM should not register as a framework until it has acquired RM 
leadership.   Evolve the ResourceManager as necessary.   One option is to 
introduce an ResourceManagerRunner that acquires leadership before starting the 
RM.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-7469) Handle slot requests occuring before RM registration completes

2017-08-17 Thread Eron Wright (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7469?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Eron Wright  updated FLINK-7469:

Description: 
*Description*
Occasionally the TM-to-RM registration ask times out, causing the TM to pause 
registration for 10 seconds.  Meanwhile the registration may actually have 
succeeded in the RM.   Slot requests may then arrive at the TM while RM 
registration is incomplete.   

The current behavior appears to be that the TM honors the slot request.   
Please determine whether this is a feature or a bug.   If a feature, maybe a 
slot request should implicitly complete the registration.

*Example*
See attached a log showing a certain TM exhibiting the described behavior. 
The RM launched 12 TMs in parallel, evidently causing the RM to sluggishly 
respond to a couple of the TM registration requests.   From the logs we see 
that '00012' and '3' experienced a registration timeout but accepted a slot 
request anyway.

  was:

Occasionally the TM-to-RM registration ask times out, causing the TM to pause 
registration for 10 seconds.  Meanwhile the registration may actually have 
succeeded in the RM.   Slot requests may then arrive at the TM while RM 
registration is incomplete.   

The current behavior appears to be that the TM honors the slot request.   
Please determine whether this is a feature or a bug.   If a feature, maybe a 
slot request should implicitly complete the registration.

See attached a log showing a certain TM exhibiting the described behavior.


> Handle slot requests occuring before RM registration completes
> --
>
> Key: FLINK-7469
> URL: https://issues.apache.org/jira/browse/FLINK-7469
> Project: Flink
>  Issue Type: Sub-task
>  Components: Cluster Management
>Reporter: Eron Wright 
>Priority: Minor
> Attachments: jm.log, taskmanager-3.log
>
>
> *Description*
> Occasionally the TM-to-RM registration ask times out, causing the TM to pause 
> registration for 10 seconds.  Meanwhile the registration may actually have 
> succeeded in the RM.   Slot requests may then arrive at the TM while RM 
> registration is incomplete.   
> The current behavior appears to be that the TM honors the slot request.   
> Please determine whether this is a feature or a bug.   If a feature, maybe a 
> slot request should implicitly complete the registration.
> *Example*
> See attached a log showing a certain TM exhibiting the described behavior.
>  The RM launched 12 TMs in parallel, evidently causing the RM to sluggishly 
> respond to a couple of the TM registration requests.   From the logs we see 
> that '00012' and '3' experienced a registration timeout but accepted a 
> slot request anyway.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-5633) ClassCastException: X cannot be cast to X when re-submitting a job.

2017-08-17 Thread Svend Vanderveken (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5633?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16130917#comment-16130917
 ] 

Svend Vanderveken commented on FLINK-5633:
--

Hi all, 

I am hitting exactly the same issue, although I am not using the 
`SpecificDatumReader` directly but rather using Confluent's Avro deserializer. 
We opted for this one since it has the advantage of being integrated with the 
Schema registry, and thus validating the schemas at runtime. 

I am not the first person to encounter this exact situation and I found and 
commented the  PR below, which essentially suggests to apply the fix mentioned 
by [~gcaliari] to that deserializer.

https://github.com/confluentinc/schema-registry/pull/509

[~StephanEwen], [~gcaliari], feel free to add comments on that PR if you think 
it should move forward, I think such kind of update would greatly help to 
integrate Kafka and Flink through Avro and the Schema Registry. 


> ClassCastException: X cannot be cast to X when re-submitting a job.
> ---
>
> Key: FLINK-5633
> URL: https://issues.apache.org/jira/browse/FLINK-5633
> Project: Flink
>  Issue Type: Bug
>  Components: Job-Submission, YARN
>Affects Versions: 1.1.4
>Reporter: Giuliano Caliari
>Priority: Minor
>
> I’m running a job on my local cluster and the first time I submit the job 
> everything works but whenever I cancel and re-submit the same job it fails 
> with:
> {quote}
> org.apache.flink.client.program.ProgramInvocationException: The program 
> execution failed: Job execution failed.
>   at 
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:427)
>   at 
> org.apache.flink.client.program.StandaloneClusterClient.submitJob(StandaloneClusterClient.java:101)
>   at 
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:400)
>   at 
> org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:66)
>   at 
> org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:634)
>   at au.com.my.package.pTraitor.OneTrait.execute(Traitor.scala:147)
>   at 
> au.com.my.package.pTraitor.TraitorAppOneTrait$.delayedEndpoint$au$com$my$package$pTraitor$TraitorAppOneTrait$1(TraitorApp.scala:22)
>   at 
> au.com.my.package.pTraitor.TraitorAppOneTrait$delayedInit$body.apply(TraitorApp.scala:21)
>   at scala.Function0$class.apply$mcV$sp(Function0.scala:34)
>   at 
> scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
>   at scala.App$$anonfun$main$1.apply(App.scala:76)
>   at scala.App$$anonfun$main$1.apply(App.scala:76)
>   at scala.collection.immutable.List.foreach(List.scala:381)
>   at 
> scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35)
>   at scala.App$class.main(App.scala:76)
>   at 
> au.com.my.package.pTraitor.TraitorAppOneTrait$.main(TraitorApp.scala:21)
>   at au.com.my.package.pTraitor.TraitorAppOneTrait.main(TraitorApp.scala)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:528)
>   at 
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:419)
>   at 
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:339)
>   at 
> org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:831)
>   at org.apache.flink.client.CliFrontend.run(CliFrontend.java:256)
>   at 
> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1073)
>   at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1120)
>   at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1117)
>   at 
> org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:29)
>   at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1116)
> Caused by: org.apache.flink.runtime.client.JobExecutionException: Job 
> execution failed.
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply$mcV$sp(JobManager.scala:900)
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:843)
>   at 
> 

[jira] [Updated] (FLINK-7469) Handle slot requests occuring before RM registration completes

2017-08-17 Thread Eron Wright (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7469?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Eron Wright  updated FLINK-7469:

Attachment: jm.log
taskmanager-3.log

> Handle slot requests occuring before RM registration completes
> --
>
> Key: FLINK-7469
> URL: https://issues.apache.org/jira/browse/FLINK-7469
> Project: Flink
>  Issue Type: Sub-task
>  Components: Cluster Management
>Reporter: Eron Wright 
>Priority: Minor
> Attachments: jm.log, taskmanager-3.log
>
>
> Occasionally the TM-to-RM registration ask times out, causing the TM to pause 
> registration for 10 seconds.  Meanwhile the registration may actually have 
> succeeded in the RM.   Slot requests may then arrive at the TM while RM 
> registration is incomplete.   
> The current behavior appears to be that the TM honors the slot request.   
> Please determine whether this is a feature or a bug.   If a feature, maybe a 
> slot request should implicitly complete the registration.
> See attached a log showing a certain TM exhibiting the described behavior.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-7469) Handle slot requests occuring before RM registration completes

2017-08-17 Thread Eron Wright (JIRA)
Eron Wright  created FLINK-7469:
---

 Summary: Handle slot requests occuring before RM registration 
completes
 Key: FLINK-7469
 URL: https://issues.apache.org/jira/browse/FLINK-7469
 Project: Flink
  Issue Type: Sub-task
Reporter: Eron Wright 
Priority: Minor



Occasionally the TM-to-RM registration ask times out, causing the TM to pause 
registration for 10 seconds.  Meanwhile the registration may actually have 
succeeded in the RM.   Slot requests may then arrive at the TM while RM 
registration is incomplete.   

The current behavior appears to be that the TM honors the slot request.   
Please determine whether this is a feature or a bug.   If a feature, maybe a 
slot request should implicitly complete the registration.

See attached a log showing a certain TM exhibiting the described behavior.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7442) Add option for using a child-first classloader for loading user code

2017-08-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7442?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16130810#comment-16130810
 ] 

ASF GitHub Bot commented on FLINK-7442:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/4554
  
I took only a very brief look at this, but I am not totally sure whether 
the `ChildFirstClassLoader` implementation is actually correct. Even if it is, 
it seems to do redundant work, like looking at the URLs twice (in the 
`findClass(name);` call and the `super.loadClass(name, resolve);` call).

We have a working version of a ChildFirstClassLoader here, why not use 
that? Is that implementation suboptimal?

https://github.com/apache/flink/blob/fa11845b926f8371e9cee47775ca0e48176b686e/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDbMultiClassLoaderTest.java#L78

We should also never reference the System Classloader directly, it breaks 
all embedded setups or service architecture (OSGI) setups where you end up with 
hierarchical class loaders.

My feeling is also that this does many changes that may not be necessary, 
like change the setup of the client, packaged program, etc. Passing the 
configuration through everything makes this change rather involved.

I was wondering if it is not sufficient to simply let the TaskManager pass 
this as a flag to the library cache manager. Then you would not need to pass 
configs everywhere - the only ever config access is by the TaskManager or Task 
when it creates the classloader, and the config is available there anyways. 

Concerning class loader setup on the client - not sure if we should change 
this in the same PR. This is probably much less critical (the main method does 
not instantiate many of the dependencies) and that part changes so heavily with 
flip-6 already. Various setups may not even have separate classloaders on the 
client anyways, but everything is in the app class loader there.


> Add option for using a child-first classloader for loading user code
> 
>
> Key: FLINK-7442
> URL: https://issues.apache.org/jira/browse/FLINK-7442
> Project: Flink
>  Issue Type: Improvement
>  Components: Local Runtime
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4554: [FLINK-7442] Add option for using child-first classloader...

2017-08-17 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/4554
  
I took only a very brief look at this, but I am not totally sure whether 
the `ChildFirstClassLoader` implementation is actually correct. Even if it is, 
it seems to do redundant work, like looking at the URLs twice (in the 
`findClass(name);` call and the `super.loadClass(name, resolve);` call).

We have a working version of a ChildFirstClassLoader here, why not use 
that? Is that implementation suboptimal?

https://github.com/apache/flink/blob/fa11845b926f8371e9cee47775ca0e48176b686e/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDbMultiClassLoaderTest.java#L78

We should also never reference the System Classloader directly, it breaks 
all embedded setups or service architecture (OSGI) setups where you end up with 
hierarchical class loaders.

My feeling is also that this does many changes that may not be necessary, 
like change the setup of the client, packaged program, etc. Passing the 
configuration through everything makes this change rather involved.

I was wondering if it is not sufficient to simply let the TaskManager pass 
this as a flag to the library cache manager. Then you would not need to pass 
configs everywhere - the only ever config access is by the TaskManager or Task 
when it creates the classloader, and the config is available there anyways. 

Concerning class loader setup on the client - not sure if we should change 
this in the same PR. This is probably much less critical (the main method does 
not instantiate many of the dependencies) and that part changes so heavily with 
flip-6 already. Various setups may not even have separate classloaders on the 
client anyways, but everything is in the app class loader there.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-7468) Implement Netty sender backlog logic for credit-based

2017-08-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7468?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16130756#comment-16130756
 ] 

ASF GitHub Bot commented on FLINK-7468:
---

GitHub user zhijiangW opened a pull request:

https://github.com/apache/flink/pull/4559

[FLINK-7468][network]Implement Netty sender backlog logic for credit-based

## What is the purpose of the change

This PR is based on #4533 whose commits are also included for passing 
travis. Review the last commit for this PR change.

Receivers should know how many buffers are available on the sender side 
(the backlog). The receivers use this information to decide how to distribute 
floating buffers. So the backlog is attached in `BufferResponse` by sender as 
an absolute value after the buffer being transferred.

## Brief change log

  - *Adds the `getBacklog` method in `ResultSubpartitionView`*
  - *The `ResultSubpartition maintains the backlog to increase value when 
adding buffer to it and decrease value when polling buffer from it*

## Verifying this change

This change will be covered by the test case in next PR.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable)



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zhijiangW/flink FLINK-7468

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4559.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4559


commit e35c1ff8066bf44344495d132a1092b9db3ef182
Author: Zhijiang 
Date:   2017-08-07T09:31:17Z

[FLINK-7378][core]Create a fix size (non rebalancing) buffer pool type for 
the floating buffers

commit 969c24d3bf80c1ff89ada11e81b9bf4fea14066f
Author: Zhijiang 
Date:   2017-08-14T06:30:47Z

[FLINK-7394][core]Implement basic InputChannel with free buffers,credit and 
backlog

commit 15fa828449d73f53042c57e9c5494d75ddee575f
Author: Zhijiang 
Date:   2017-08-10T05:29:13Z

[FLINK-7406][network]Implement Netty receiver incoming pipeline for 
credit-based

commit d0674244f15701863a5dd3f68b7274b3bd49c64d
Author: Zhijiang 
Date:   2017-08-12T14:13:25Z

[FLINK-7416][network] Implement Netty receiver outgoing pipeline for 
credit-based

commit 102f80916534719465cbbaf788ef09a57ca22879
Author: Zhijiang 
Date:   2017-08-17T11:38:45Z

[FLINK-7468][network]Implement Netty sender backlog logic for credit-based




> Implement Netty sender backlog logic for credit-based
> -
>
> Key: FLINK-7468
> URL: https://issues.apache.org/jira/browse/FLINK-7468
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Reporter: zhijiang
>Assignee: zhijiang
> Fix For: 1.4.0
>
>
> This is a part of work for credit-based network flow control.
> Receivers should know how many buffers are available on the sender side (the 
> backlog). The receivers use this information to decide how to distribute 
> floating buffers.
> The {{ResultSubpartition}} maintains the backlog which only indicates the 
> number of buffers in this subpartition, not including the number of events. 
> The backlog is increased for adding buffer to this subpartition, and 
> decreased for polling buffer from it.
> The backlog is attached in {{BufferResponse}} by sender as an absolute value 
> after the buffer being transferred.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4559: [FLINK-7468][network]Implement Netty sender backlo...

2017-08-17 Thread zhijiangW
GitHub user zhijiangW opened a pull request:

https://github.com/apache/flink/pull/4559

[FLINK-7468][network]Implement Netty sender backlog logic for credit-based

## What is the purpose of the change

This PR is based on #4533 whose commits are also included for passing 
travis. Review the last commit for this PR change.

Receivers should know how many buffers are available on the sender side 
(the backlog). The receivers use this information to decide how to distribute 
floating buffers. So the backlog is attached in `BufferResponse` by sender as 
an absolute value after the buffer being transferred.

## Brief change log

  - *Adds the `getBacklog` method in `ResultSubpartitionView`*
  - *The `ResultSubpartition maintains the backlog to increase value when 
adding buffer to it and decrease value when polling buffer from it*

## Verifying this change

This change will be covered by the test case in next PR.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable)



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zhijiangW/flink FLINK-7468

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4559.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4559


commit e35c1ff8066bf44344495d132a1092b9db3ef182
Author: Zhijiang 
Date:   2017-08-07T09:31:17Z

[FLINK-7378][core]Create a fix size (non rebalancing) buffer pool type for 
the floating buffers

commit 969c24d3bf80c1ff89ada11e81b9bf4fea14066f
Author: Zhijiang 
Date:   2017-08-14T06:30:47Z

[FLINK-7394][core]Implement basic InputChannel with free buffers,credit and 
backlog

commit 15fa828449d73f53042c57e9c5494d75ddee575f
Author: Zhijiang 
Date:   2017-08-10T05:29:13Z

[FLINK-7406][network]Implement Netty receiver incoming pipeline for 
credit-based

commit d0674244f15701863a5dd3f68b7274b3bd49c64d
Author: Zhijiang 
Date:   2017-08-12T14:13:25Z

[FLINK-7416][network] Implement Netty receiver outgoing pipeline for 
credit-based

commit 102f80916534719465cbbaf788ef09a57ca22879
Author: Zhijiang 
Date:   2017-08-17T11:38:45Z

[FLINK-7468][network]Implement Netty sender backlog logic for credit-based




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Created] (FLINK-7468) Implement Netty sender backlog logic for credit-based

2017-08-17 Thread zhijiang (JIRA)
zhijiang created FLINK-7468:
---

 Summary: Implement Netty sender backlog logic for credit-based
 Key: FLINK-7468
 URL: https://issues.apache.org/jira/browse/FLINK-7468
 Project: Flink
  Issue Type: Sub-task
  Components: Network
Reporter: zhijiang
Assignee: zhijiang
 Fix For: 1.4.0


This is a part of work for credit-based network flow control.

Receivers should know how many buffers are available on the sender side (the 
backlog). The receivers use this information to decide how to distribute 
floating buffers.

The {{ResultSubpartition}} maintains the backlog which only indicates the 
number of buffers in this subpartition, not including the number of events. The 
backlog is increased for adding buffer to this subpartition, and decreased for 
polling buffer from it.

The backlog is attached in {{BufferResponse}} by sender as an absolute value 
after the buffer being transferred.




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4380: Time sort with offset/fetch without retraction

2017-08-17 Thread rtudoran
Github user rtudoran commented on the issue:

https://github.com/apache/flink/pull/4380
  
@fhueske
I have updated the PR.
Based on the previous discussions i have:
1) integrated the support for offset and fetch when time is ascending as 
you suggested (having a counter within the process function that will restrict 
the output)
2) add back the implementation for offset and fetch with retraction and 
bind this for queries that would have time descending. It is important to 
remark that the way i intended to enable this behavior is that at every new 
time unit the offset/fetch restriction will be reapplied from scratch. That 
means that in an example such as (getting the first 2 elements): 
t1: event 1, event 2, event 3 => output is event 1 and event 2
t2: event 4 => output is retract (events 1&2) emit event 4
 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-7466) Add a flink connector for Apache RocketMQ

2017-08-17 Thread yukon (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7466?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16130375#comment-16130375
 ] 

yukon commented on FLINK-7466:
--

Hi [~jark]

Many thanks for your reminding.

> Add a flink connector for Apache RocketMQ
> -
>
> Key: FLINK-7466
> URL: https://issues.apache.org/jira/browse/FLINK-7466
> Project: Flink
>  Issue Type: New Feature
>Reporter: yukon
>
> Hi Flink community:
> Flink is really a great stream processing framework, I would like to 
> contribute a flink-rocketmq-connector, if you think it's acceptable, I will 
> submit a pull request soon.
> Apache RocketMQ is a distributed messaging and streaming platform with low 
> latency, high performance and reliability, trillion-level capacity and 
> flexible scalability. More info please refer to http://rocketmq.apache.org/



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7466) Add a flink connector for Apache RocketMQ

2017-08-17 Thread Jark Wu (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7466?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16130361#comment-16130361
 ] 

Jark Wu commented on FLINK-7466:


Thank you for writing and contributing a RocketMQ connector. New Flink 
connectors are now developed in the [Apache Bahir 
repository|https://github.com/apache/bahir-flink]. Please continue to make use 
of the flink-dev and flink-user mailing lists as you work on the new connector.

> Add a flink connector for Apache RocketMQ
> -
>
> Key: FLINK-7466
> URL: https://issues.apache.org/jira/browse/FLINK-7466
> Project: Flink
>  Issue Type: New Feature
>Reporter: yukon
>
> Hi Flink community:
> Flink is really a great stream processing framework, I would like to 
> contribute a flink-rocketmq-connector, if you think it's acceptable, I will 
> submit a pull request soon.
> Apache RocketMQ is a distributed messaging and streaming platform with low 
> latency, high performance and reliability, trillion-level capacity and 
> flexible scalability. More info please refer to http://rocketmq.apache.org/



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4553: [FLINK-7642] [docs] Add very obvious warning about outdat...

2017-08-17 Thread uce
Github user uce commented on the issue:

https://github.com/apache/flink/pull/4553
  
@twalthr Thanks for doing the work of merging this to the other branches as 
well. I've triggered a build for all branches. While 1.2 worked and the warning 
is now available online 
(https://ci.apache.org/projects/flink/flink-docs-release-1.2/), older branches 
don't build properly:

https://ci.apache.org/builders/flink-docs-release-1.0/builds/545
https://ci.apache.org/builders/flink-docs-release-1.1/builds/391


Do you have a clue what we can do there? Also pinging @rmetzger who has 
some experience with the Apache infrastructure...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-7057) move BLOB ref-counting from LibraryCacheManager to BlobCache

2017-08-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7057?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16130295#comment-16130295
 ] 

ASF GitHub Bot commented on FLINK-7057:
---

Github user NicoK commented on the issue:

https://github.com/apache/flink/pull/4238
  
ok, rebased again but upon #4558 which adds one small fix that could cause 
hanging tests. I also addressed the comments you had, thanks.


> move BLOB ref-counting from LibraryCacheManager to BlobCache
> 
>
> Key: FLINK-7057
> URL: https://issues.apache.org/jira/browse/FLINK-7057
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination, Network
>Affects Versions: 1.4.0
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>
> Currently, the {{LibraryCacheManager}} is doing some ref-counting for JAR 
> files managed by it. Instead, we want the {{BlobCache}} to do that itself for 
> all job-related BLOBs. Also, we do not want to operate on a per-{{BlobKey}} 
> level but rather per job. Therefore, the cleanup process should be adapted, 
> too.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4238: [FLINK-7057][blob] move BLOB ref-counting from LibraryCac...

2017-08-17 Thread NicoK
Github user NicoK commented on the issue:

https://github.com/apache/flink/pull/4238
  
ok, rebased again but upon #4558 which adds one small fix that could cause 
hanging tests. I also addressed the comments you had, thanks.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-7057) move BLOB ref-counting from LibraryCacheManager to BlobCache

2017-08-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7057?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16130291#comment-16130291
 ] 

ASF GitHub Bot commented on FLINK-7057:
---

Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4238#discussion_r133688568
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/testtasks/FailingBlockingInvokable.java
 ---
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.testtasks;
+
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+
+/**
+ * Task which blocks until the (static) {@link #unblock()} method is 
called and then fails with an
+ * exception.
+ */
+public class FailingBlockingInvokable extends AbstractInvokable {
+   private static boolean blocking = true;
--- End diff --

oh, sure


> move BLOB ref-counting from LibraryCacheManager to BlobCache
> 
>
> Key: FLINK-7057
> URL: https://issues.apache.org/jira/browse/FLINK-7057
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination, Network
>Affects Versions: 1.4.0
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>
> Currently, the {{LibraryCacheManager}} is doing some ref-counting for JAR 
> files managed by it. Instead, we want the {{BlobCache}} to do that itself for 
> all job-related BLOBs. Also, we do not want to operate on a per-{{BlobKey}} 
> level but rather per job. Therefore, the cleanup process should be adapted, 
> too.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4238: [FLINK-7057][blob] move BLOB ref-counting from Lib...

2017-08-17 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4238#discussion_r133688568
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/testtasks/FailingBlockingInvokable.java
 ---
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.testtasks;
+
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+
+/**
+ * Task which blocks until the (static) {@link #unblock()} method is 
called and then fails with an
+ * exception.
+ */
+public class FailingBlockingInvokable extends AbstractInvokable {
+   private static boolean blocking = true;
--- End diff --

oh, sure


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4238: [FLINK-7057][blob] move BLOB ref-counting from Lib...

2017-08-17 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4238#discussion_r133688433
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerCleanupITCase.java
 ---
@@ -0,0 +1,298 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmanager;
+
+import akka.actor.ActorSystem;
+import akka.testkit.JavaTestKit;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.AkkaOptions;
+import org.apache.flink.configuration.BlobServerOptions;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.akka.ListeningBehaviour;
+import org.apache.flink.runtime.blob.BlobClient;
+import org.apache.flink.runtime.blob.BlobKey;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.instance.AkkaActorGateway;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.messages.JobManagerMessages;
+import org.apache.flink.runtime.testingUtils.TestingCluster;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.runtime.testtasks.FailingBlockingInvokable;
+import org.apache.flink.runtime.testtasks.NoOpInvokable;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.io.File;
+import java.io.FilenameFilter;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.Arrays;
+
+import static 
org.apache.flink.runtime.testingUtils.TestingUtils.DEFAULT_AKKA_ASK_TIMEOUT;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.fail;
+
+/**
+ * Small test to check that the {@link 
org.apache.flink.runtime.blob.BlobServer} cleanup is executed
+ * after job termination.
+ */
+public class JobManagerCleanupITCase {
+
+   @Rule
+   public TemporaryFolder tmpFolder = new TemporaryFolder();
+
+   private static ActorSystem system;
+
+   @BeforeClass
+   public static void setup() {
+   system = AkkaUtils.createLocalActorSystem(new Configuration());
+   }
+
+   @AfterClass
+   public static void teardown() {
+   JavaTestKit.shutdownActorSystem(system);
+   }
+
+   /**
+* Specifies which test case to run in {@link 
#testBlobServerCleanup(TestCase)}.
+*/
+   private enum TestCase {
+   JOB_FINISHES_SUCESSFULLY,
+   JOB_IS_CANCELLED,
+   JOB_FAILS,
+   JOB_SUBMISSION_FAILS
+   }
+
+   /**
+* Test cleanup for a job that finishes ordinarily.
+*/
+   @Test
+   public void testBlobServerCleanupFinishedJob() throws IOException {
+   testBlobServerCleanup(TestCase.JOB_FINISHES_SUCESSFULLY);
+   }
+
+   /**
+* Test cleanup for a job which is cancelled after submission.
+*/
+   @Test
+   public void testBlobServerCleanupCancelledJob() throws IOException {
+   testBlobServerCleanup(TestCase.JOB_IS_CANCELLED);
+   }
+
+   /**
+* Test cleanup for a job that fails (first a task fails, then the job 
recovers, then the whole
+* job fails due to a limited restart policy).
+*/
+   @Test
+   public void testBlobServerCleanupFailedJob() throws IOException {
+   testBlobServerCleanup(TestCase.JOB_FAILS);
+   }
+
+   /**
+

[jira] [Commented] (FLINK-7057) move BLOB ref-counting from LibraryCacheManager to BlobCache

2017-08-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7057?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16130289#comment-16130289
 ] 

ASF GitHub Bot commented on FLINK-7057:
---

Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4238#discussion_r133688433
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerCleanupITCase.java
 ---
@@ -0,0 +1,298 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmanager;
+
+import akka.actor.ActorSystem;
+import akka.testkit.JavaTestKit;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.AkkaOptions;
+import org.apache.flink.configuration.BlobServerOptions;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.akka.ListeningBehaviour;
+import org.apache.flink.runtime.blob.BlobClient;
+import org.apache.flink.runtime.blob.BlobKey;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.instance.AkkaActorGateway;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.messages.JobManagerMessages;
+import org.apache.flink.runtime.testingUtils.TestingCluster;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.runtime.testtasks.FailingBlockingInvokable;
+import org.apache.flink.runtime.testtasks.NoOpInvokable;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.io.File;
+import java.io.FilenameFilter;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.Arrays;
+
+import static 
org.apache.flink.runtime.testingUtils.TestingUtils.DEFAULT_AKKA_ASK_TIMEOUT;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.fail;
+
+/**
+ * Small test to check that the {@link 
org.apache.flink.runtime.blob.BlobServer} cleanup is executed
+ * after job termination.
+ */
+public class JobManagerCleanupITCase {
+
+   @Rule
+   public TemporaryFolder tmpFolder = new TemporaryFolder();
+
+   private static ActorSystem system;
+
+   @BeforeClass
+   public static void setup() {
+   system = AkkaUtils.createLocalActorSystem(new Configuration());
+   }
+
+   @AfterClass
+   public static void teardown() {
+   JavaTestKit.shutdownActorSystem(system);
+   }
+
+   /**
+* Specifies which test case to run in {@link 
#testBlobServerCleanup(TestCase)}.
+*/
+   private enum TestCase {
+   JOB_FINISHES_SUCESSFULLY,
+   JOB_IS_CANCELLED,
+   JOB_FAILS,
+   JOB_SUBMISSION_FAILS
+   }
+
+   /**
+* Test cleanup for a job that finishes ordinarily.
+*/
+   @Test
+   public void testBlobServerCleanupFinishedJob() throws IOException {
+   testBlobServerCleanup(TestCase.JOB_FINISHES_SUCESSFULLY);
+   }
+
+   /**
+* Test cleanup for a job which is cancelled after submission.
+*/
+   @Test
+   public void testBlobServerCleanupCancelledJob() throws IOException {
+   testBlobServerCleanup(TestCase.JOB_IS_CANCELLED);
+   }
+
+   /**
+* Test cleanup for a job that fails (first a task fails, then the job 
recovers, then the whole
+* 

[jira] [Commented] (FLINK-7057) move BLOB ref-counting from LibraryCacheManager to BlobCache

2017-08-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7057?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16130284#comment-16130284
 ] 

ASF GitHub Bot commented on FLINK-7057:
---

Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4238#discussion_r133687826
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheCleanupTest.java
 ---
@@ -0,0 +1,324 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.blob;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.BlobServerOptions;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.junit.Ignore;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * A few tests for the deferred ref-counting based cleanup inside the 
{@link BlobCache}.
+ */
+public class BlobCacheCleanupTest {
+
+   @Rule
+   public TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+   /**
+* Tests that {@link BlobCache} cleans up after calling {@link 
BlobCache#releaseJob(JobID)}.
+*/
+   @Test
+   public void testJobCleanup() throws IOException, InterruptedException {
+
+   JobID jobId = new JobID();
+   List keys = new ArrayList();
+   BlobServer server = null;
+   BlobCache cache = null;
+
+   final byte[] buf = new byte[128];
+
+   try {
+   Configuration config = new Configuration();
+   config.setString(BlobServerOptions.STORAGE_DIRECTORY,
+   temporaryFolder.newFolder().getAbsolutePath());
+   config.setLong(BlobServerOptions.CLEANUP_INTERVAL, 1L);
+
+   server = new BlobServer(config, new VoidBlobStore());
+   InetSocketAddress serverAddress = new 
InetSocketAddress("localhost", server.getPort());
+   BlobClient bc = new BlobClient(serverAddress, config);
+   cache = new BlobCache(serverAddress, config, new 
VoidBlobStore());
+
+   keys.add(bc.put(jobId, buf));
+   buf[0] += 1;
+   keys.add(bc.put(jobId, buf));
+
+   bc.close();
+
+   checkFileCountForJob(2, jobId, server);
+   checkFileCountForJob(0, jobId, cache);
+
+   // register once
+   cache.registerJob(jobId);
+
+   checkFileCountForJob(2, jobId, server);
+   checkFileCountForJob(0, jobId, cache);
+
+   for (BlobKey key : keys) {
+   cache.getFile(jobId, key);
+   }
+
+   // register again (let's say, from another thread or so)
+   cache.registerJob(jobId);
+   for (BlobKey key : keys) {
+   cache.getFile(jobId, key);
+   }
+
+   assertEquals(2, checkFilesExist(jobId, keys, cache, 
true));
+   checkFileCountForJob(2, jobId, server);
+   checkFileCountForJob(2, jobId, cache);
+
+   // after releasing once, nothing should change
+   cache.releaseJob(jobId);
+
+   assertEquals(2, checkFilesExist(jobId, keys, cache, 
true));
+   checkFileCountForJob(2, jobId, server);
+   

[GitHub] flink pull request #4238: [FLINK-7057][blob] move BLOB ref-counting from Lib...

2017-08-17 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4238#discussion_r133687826
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheCleanupTest.java
 ---
@@ -0,0 +1,324 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.blob;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.BlobServerOptions;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.junit.Ignore;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * A few tests for the deferred ref-counting based cleanup inside the 
{@link BlobCache}.
+ */
+public class BlobCacheCleanupTest {
+
+   @Rule
+   public TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+   /**
+* Tests that {@link BlobCache} cleans up after calling {@link 
BlobCache#releaseJob(JobID)}.
+*/
+   @Test
+   public void testJobCleanup() throws IOException, InterruptedException {
+
+   JobID jobId = new JobID();
+   List keys = new ArrayList();
+   BlobServer server = null;
+   BlobCache cache = null;
+
+   final byte[] buf = new byte[128];
+
+   try {
+   Configuration config = new Configuration();
+   config.setString(BlobServerOptions.STORAGE_DIRECTORY,
+   temporaryFolder.newFolder().getAbsolutePath());
+   config.setLong(BlobServerOptions.CLEANUP_INTERVAL, 1L);
+
+   server = new BlobServer(config, new VoidBlobStore());
+   InetSocketAddress serverAddress = new 
InetSocketAddress("localhost", server.getPort());
+   BlobClient bc = new BlobClient(serverAddress, config);
+   cache = new BlobCache(serverAddress, config, new 
VoidBlobStore());
+
+   keys.add(bc.put(jobId, buf));
+   buf[0] += 1;
+   keys.add(bc.put(jobId, buf));
+
+   bc.close();
+
+   checkFileCountForJob(2, jobId, server);
+   checkFileCountForJob(0, jobId, cache);
+
+   // register once
+   cache.registerJob(jobId);
+
+   checkFileCountForJob(2, jobId, server);
+   checkFileCountForJob(0, jobId, cache);
+
+   for (BlobKey key : keys) {
+   cache.getFile(jobId, key);
+   }
+
+   // register again (let's say, from another thread or so)
+   cache.registerJob(jobId);
+   for (BlobKey key : keys) {
+   cache.getFile(jobId, key);
+   }
+
+   assertEquals(2, checkFilesExist(jobId, keys, cache, 
true));
+   checkFileCountForJob(2, jobId, server);
+   checkFileCountForJob(2, jobId, cache);
+
+   // after releasing once, nothing should change
+   cache.releaseJob(jobId);
+
+   assertEquals(2, checkFilesExist(jobId, keys, cache, 
true));
+   checkFileCountForJob(2, jobId, server);
+   checkFileCountForJob(2, jobId, cache);
+
+   // after releasing the second time, the job is up for 
deferred cleanup
+   cache.releaseJob(jobId);
+
+   // because we cannot 

[jira] [Commented] (FLINK-7057) move BLOB ref-counting from LibraryCacheManager to BlobCache

2017-08-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7057?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16130273#comment-16130273
 ] 

ASF GitHub Bot commented on FLINK-7057:
---

Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4238#discussion_r133685671
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java 
---
@@ -148,7 +149,10 @@
/** Service to contend for and retrieve the leadership of JM and RM */
private final HighAvailabilityServices highAvailabilityServices;
 
-   /** Blob cache manager used across jobs */
+   /** Blob server used across jobs */
+   private final BlobServer blobServer;
--- End diff --

Actually, I'd rather let the services involved in the life cycle of the 
BLOBs known what they are dealing with than adding empty `register/releaseJob` 
methods to the `BlobService`/`BlobServer`. Once we move the `BlobServer` out of 
the `JobMaster`, we can  change the class of the parameters and also remove the 
`register/releaseJob` calls.


> move BLOB ref-counting from LibraryCacheManager to BlobCache
> 
>
> Key: FLINK-7057
> URL: https://issues.apache.org/jira/browse/FLINK-7057
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination, Network
>Affects Versions: 1.4.0
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>
> Currently, the {{LibraryCacheManager}} is doing some ref-counting for JAR 
> files managed by it. Instead, we want the {{BlobCache}} to do that itself for 
> all job-related BLOBs. Also, we do not want to operate on a per-{{BlobKey}} 
> level but rather per job. Therefore, the cleanup process should be adapted, 
> too.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4238: [FLINK-7057][blob] move BLOB ref-counting from Lib...

2017-08-17 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4238#discussion_r133685671
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java 
---
@@ -148,7 +149,10 @@
/** Service to contend for and retrieve the leadership of JM and RM */
private final HighAvailabilityServices highAvailabilityServices;
 
-   /** Blob cache manager used across jobs */
+   /** Blob server used across jobs */
+   private final BlobServer blobServer;
--- End diff --

Actually, I'd rather let the services involved in the life cycle of the 
BLOBs known what they are dealing with than adding empty `register/releaseJob` 
methods to the `BlobService`/`BlobServer`. Once we move the `BlobServer` out of 
the `JobMaster`, we can  change the class of the parameters and also remove the 
`register/releaseJob` calls.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-7056) add API to allow job-related BLOBs to be stored

2017-08-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7056?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16130209#comment-16130209
 ] 

ASF GitHub Bot commented on FLINK-7056:
---

Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4558#discussion_r133675837
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientTest.java 
---
@@ -139,30 +139,35 @@ private static BlobKey prepareTestFile(File file) 
throws IOException {
 * the specified buffer.
 * 
 * @param inputStream
-*the input stream returned from the GET operation
+*the input stream returned from the GET operation (will be 
closed by this method)
 * @param buf
 *the buffer to compare the input stream's data to
 * @throws IOException
 * thrown if an I/O error occurs while reading the input stream
 */
static void validateGet(final InputStream inputStream, final byte[] 
buf) throws IOException {
--- End diff --

ok, why not... ;)


> add API to allow job-related BLOBs to be stored
> ---
>
> Key: FLINK-7056
> URL: https://issues.apache.org/jira/browse/FLINK-7056
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination, Network
>Affects Versions: 1.4.0
>Reporter: Nico Kruber
>Assignee: Nico Kruber
> Fix For: 1.4.0
>
>
> To ease cleanup, we will make job-related BLOBs be reflected in the blob 
> storage so that they may be removed along with the job. This adds the jobId 
> to many methods similar to the previous code from the {{NAME_ADDRESSABLE}} 
> mode.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4558: [FLINK-7056][tests][hotfix] make sure the client a...

2017-08-17 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4558#discussion_r133675837
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientTest.java 
---
@@ -139,30 +139,35 @@ private static BlobKey prepareTestFile(File file) 
throws IOException {
 * the specified buffer.
 * 
 * @param inputStream
-*the input stream returned from the GET operation
+*the input stream returned from the GET operation (will be 
closed by this method)
 * @param buf
 *the buffer to compare the input stream's data to
 * @throws IOException
 * thrown if an I/O error occurs while reading the input stream
 */
static void validateGet(final InputStream inputStream, final byte[] 
buf) throws IOException {
--- End diff --

ok, why not... ;)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-7057) move BLOB ref-counting from LibraryCacheManager to BlobCache

2017-08-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7057?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16130197#comment-16130197
 ] 

ASF GitHub Bot commented on FLINK-7057:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4238#discussion_r133670563
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheCleanupTest.java
 ---
@@ -0,0 +1,324 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.blob;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.BlobServerOptions;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.junit.Ignore;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * A few tests for the deferred ref-counting based cleanup inside the 
{@link BlobCache}.
+ */
+public class BlobCacheCleanupTest {
+
+   @Rule
+   public TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+   /**
+* Tests that {@link BlobCache} cleans up after calling {@link 
BlobCache#releaseJob(JobID)}.
+*/
+   @Test
+   public void testJobCleanup() throws IOException, InterruptedException {
+
+   JobID jobId = new JobID();
+   List keys = new ArrayList();
+   BlobServer server = null;
+   BlobCache cache = null;
+
+   final byte[] buf = new byte[128];
+
+   try {
+   Configuration config = new Configuration();
+   config.setString(BlobServerOptions.STORAGE_DIRECTORY,
+   temporaryFolder.newFolder().getAbsolutePath());
+   config.setLong(BlobServerOptions.CLEANUP_INTERVAL, 1L);
+
+   server = new BlobServer(config, new VoidBlobStore());
+   InetSocketAddress serverAddress = new 
InetSocketAddress("localhost", server.getPort());
+   BlobClient bc = new BlobClient(serverAddress, config);
+   cache = new BlobCache(serverAddress, config, new 
VoidBlobStore());
+
+   keys.add(bc.put(jobId, buf));
+   buf[0] += 1;
+   keys.add(bc.put(jobId, buf));
+
+   bc.close();
+
+   checkFileCountForJob(2, jobId, server);
+   checkFileCountForJob(0, jobId, cache);
+
+   // register once
+   cache.registerJob(jobId);
+
+   checkFileCountForJob(2, jobId, server);
+   checkFileCountForJob(0, jobId, cache);
+
+   for (BlobKey key : keys) {
+   cache.getFile(jobId, key);
+   }
+
+   // register again (let's say, from another thread or so)
+   cache.registerJob(jobId);
+   for (BlobKey key : keys) {
+   cache.getFile(jobId, key);
+   }
+
+   assertEquals(2, checkFilesExist(jobId, keys, cache, 
true));
+   checkFileCountForJob(2, jobId, server);
+   checkFileCountForJob(2, jobId, cache);
+
+   // after releasing once, nothing should change
+   cache.releaseJob(jobId);
+
+   assertEquals(2, checkFilesExist(jobId, keys, cache, 
true));
+   checkFileCountForJob(2, jobId, server);
+  

[jira] [Commented] (FLINK-7057) move BLOB ref-counting from LibraryCacheManager to BlobCache

2017-08-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7057?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16130195#comment-16130195
 ] 

ASF GitHub Bot commented on FLINK-7057:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4238#discussion_r133668912
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheCleanupTest.java
 ---
@@ -0,0 +1,324 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.blob;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.BlobServerOptions;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.junit.Ignore;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * A few tests for the deferred ref-counting based cleanup inside the 
{@link BlobCache}.
+ */
+public class BlobCacheCleanupTest {
--- End diff --

Let's extend from the `TestLogger` here.


> move BLOB ref-counting from LibraryCacheManager to BlobCache
> 
>
> Key: FLINK-7057
> URL: https://issues.apache.org/jira/browse/FLINK-7057
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination, Network
>Affects Versions: 1.4.0
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>
> Currently, the {{LibraryCacheManager}} is doing some ref-counting for JAR 
> files managed by it. Instead, we want the {{BlobCache}} to do that itself for 
> all job-related BLOBs. Also, we do not want to operate on a per-{{BlobKey}} 
> level but rather per job. Therefore, the cleanup process should be adapted, 
> too.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4238: [FLINK-7057][blob] move BLOB ref-counting from Lib...

2017-08-17 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4238#discussion_r133673650
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerCleanupITCase.java
 ---
@@ -0,0 +1,298 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmanager;
+
+import akka.actor.ActorSystem;
+import akka.testkit.JavaTestKit;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.AkkaOptions;
+import org.apache.flink.configuration.BlobServerOptions;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.akka.ListeningBehaviour;
+import org.apache.flink.runtime.blob.BlobClient;
+import org.apache.flink.runtime.blob.BlobKey;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.instance.AkkaActorGateway;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.messages.JobManagerMessages;
+import org.apache.flink.runtime.testingUtils.TestingCluster;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.runtime.testtasks.FailingBlockingInvokable;
+import org.apache.flink.runtime.testtasks.NoOpInvokable;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.io.File;
+import java.io.FilenameFilter;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.Arrays;
+
+import static 
org.apache.flink.runtime.testingUtils.TestingUtils.DEFAULT_AKKA_ASK_TIMEOUT;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.fail;
+
+/**
+ * Small test to check that the {@link 
org.apache.flink.runtime.blob.BlobServer} cleanup is executed
+ * after job termination.
+ */
+public class JobManagerCleanupITCase {
+
+   @Rule
+   public TemporaryFolder tmpFolder = new TemporaryFolder();
+
+   private static ActorSystem system;
+
+   @BeforeClass
+   public static void setup() {
+   system = AkkaUtils.createLocalActorSystem(new Configuration());
+   }
+
+   @AfterClass
+   public static void teardown() {
+   JavaTestKit.shutdownActorSystem(system);
+   }
+
+   /**
+* Specifies which test case to run in {@link 
#testBlobServerCleanup(TestCase)}.
+*/
+   private enum TestCase {
+   JOB_FINISHES_SUCESSFULLY,
+   JOB_IS_CANCELLED,
+   JOB_FAILS,
+   JOB_SUBMISSION_FAILS
+   }
+
+   /**
+* Test cleanup for a job that finishes ordinarily.
+*/
+   @Test
+   public void testBlobServerCleanupFinishedJob() throws IOException {
+   testBlobServerCleanup(TestCase.JOB_FINISHES_SUCESSFULLY);
+   }
+
+   /**
+* Test cleanup for a job which is cancelled after submission.
+*/
+   @Test
+   public void testBlobServerCleanupCancelledJob() throws IOException {
+   testBlobServerCleanup(TestCase.JOB_IS_CANCELLED);
+   }
+
+   /**
+* Test cleanup for a job that fails (first a task fails, then the job 
recovers, then the whole
+* job fails due to a limited restart policy).
+*/
+   @Test
+   public void testBlobServerCleanupFailedJob() throws IOException {
+   testBlobServerCleanup(TestCase.JOB_FAILS);
+   }
+
+   /**
  

[jira] [Commented] (FLINK-7057) move BLOB ref-counting from LibraryCacheManager to BlobCache

2017-08-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7057?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16130202#comment-16130202
 ] 

ASF GitHub Bot commented on FLINK-7057:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4238#discussion_r133673650
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerCleanupITCase.java
 ---
@@ -0,0 +1,298 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmanager;
+
+import akka.actor.ActorSystem;
+import akka.testkit.JavaTestKit;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.AkkaOptions;
+import org.apache.flink.configuration.BlobServerOptions;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.akka.ListeningBehaviour;
+import org.apache.flink.runtime.blob.BlobClient;
+import org.apache.flink.runtime.blob.BlobKey;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.instance.AkkaActorGateway;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.messages.JobManagerMessages;
+import org.apache.flink.runtime.testingUtils.TestingCluster;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.runtime.testtasks.FailingBlockingInvokable;
+import org.apache.flink.runtime.testtasks.NoOpInvokable;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.io.File;
+import java.io.FilenameFilter;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.Arrays;
+
+import static 
org.apache.flink.runtime.testingUtils.TestingUtils.DEFAULT_AKKA_ASK_TIMEOUT;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.fail;
+
+/**
+ * Small test to check that the {@link 
org.apache.flink.runtime.blob.BlobServer} cleanup is executed
+ * after job termination.
+ */
+public class JobManagerCleanupITCase {
+
+   @Rule
+   public TemporaryFolder tmpFolder = new TemporaryFolder();
+
+   private static ActorSystem system;
+
+   @BeforeClass
+   public static void setup() {
+   system = AkkaUtils.createLocalActorSystem(new Configuration());
+   }
+
+   @AfterClass
+   public static void teardown() {
+   JavaTestKit.shutdownActorSystem(system);
+   }
+
+   /**
+* Specifies which test case to run in {@link 
#testBlobServerCleanup(TestCase)}.
+*/
+   private enum TestCase {
+   JOB_FINISHES_SUCESSFULLY,
+   JOB_IS_CANCELLED,
+   JOB_FAILS,
+   JOB_SUBMISSION_FAILS
+   }
+
+   /**
+* Test cleanup for a job that finishes ordinarily.
+*/
+   @Test
+   public void testBlobServerCleanupFinishedJob() throws IOException {
+   testBlobServerCleanup(TestCase.JOB_FINISHES_SUCESSFULLY);
+   }
+
+   /**
+* Test cleanup for a job which is cancelled after submission.
+*/
+   @Test
+   public void testBlobServerCleanupCancelledJob() throws IOException {
+   testBlobServerCleanup(TestCase.JOB_IS_CANCELLED);
+   }
+
+   /**
+* Test cleanup for a job that fails (first a task fails, then the job 
recovers, then the whole

[jira] [Commented] (FLINK-7057) move BLOB ref-counting from LibraryCacheManager to BlobCache

2017-08-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7057?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16130198#comment-16130198
 ] 

ASF GitHub Bot commented on FLINK-7057:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4238#discussion_r133674703
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/testtasks/FailingBlockingInvokable.java
 ---
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.testtasks;
+
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+
+/**
+ * Task which blocks until the (static) {@link #unblock()} method is 
called and then fails with an
+ * exception.
+ */
+public class FailingBlockingInvokable extends AbstractInvokable {
+   private static boolean blocking = true;
--- End diff --

I think we should make `blocking` `volatile`. Otherwise we might miss 
changes and end up in a deadlock. 


> move BLOB ref-counting from LibraryCacheManager to BlobCache
> 
>
> Key: FLINK-7057
> URL: https://issues.apache.org/jira/browse/FLINK-7057
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination, Network
>Affects Versions: 1.4.0
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>
> Currently, the {{LibraryCacheManager}} is doing some ref-counting for JAR 
> files managed by it. Instead, we want the {{BlobCache}} to do that itself for 
> all job-related BLOBs. Also, we do not want to operate on a per-{{BlobKey}} 
> level but rather per job. Therefore, the cleanup process should be adapted, 
> too.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7057) move BLOB ref-counting from LibraryCacheManager to BlobCache

2017-08-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7057?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16130196#comment-16130196
 ] 

ASF GitHub Bot commented on FLINK-7057:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4238#discussion_r133670196
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheCleanupTest.java
 ---
@@ -0,0 +1,324 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.blob;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.BlobServerOptions;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.junit.Ignore;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * A few tests for the deferred ref-counting based cleanup inside the 
{@link BlobCache}.
+ */
+public class BlobCacheCleanupTest {
+
+   @Rule
+   public TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+   /**
+* Tests that {@link BlobCache} cleans up after calling {@link 
BlobCache#releaseJob(JobID)}.
+*/
+   @Test
+   public void testJobCleanup() throws IOException, InterruptedException {
+
+   JobID jobId = new JobID();
+   List keys = new ArrayList();
+   BlobServer server = null;
+   BlobCache cache = null;
+
+   final byte[] buf = new byte[128];
+
+   try {
+   Configuration config = new Configuration();
+   config.setString(BlobServerOptions.STORAGE_DIRECTORY,
+   temporaryFolder.newFolder().getAbsolutePath());
+   config.setLong(BlobServerOptions.CLEANUP_INTERVAL, 1L);
+
+   server = new BlobServer(config, new VoidBlobStore());
+   InetSocketAddress serverAddress = new 
InetSocketAddress("localhost", server.getPort());
+   BlobClient bc = new BlobClient(serverAddress, config);
+   cache = new BlobCache(serverAddress, config, new 
VoidBlobStore());
+
+   keys.add(bc.put(jobId, buf));
+   buf[0] += 1;
+   keys.add(bc.put(jobId, buf));
+
+   bc.close();
+
+   checkFileCountForJob(2, jobId, server);
+   checkFileCountForJob(0, jobId, cache);
+
+   // register once
+   cache.registerJob(jobId);
+
+   checkFileCountForJob(2, jobId, server);
+   checkFileCountForJob(0, jobId, cache);
+
+   for (BlobKey key : keys) {
+   cache.getFile(jobId, key);
+   }
+
+   // register again (let's say, from another thread or so)
+   cache.registerJob(jobId);
+   for (BlobKey key : keys) {
+   cache.getFile(jobId, key);
+   }
+
+   assertEquals(2, checkFilesExist(jobId, keys, cache, 
true));
+   checkFileCountForJob(2, jobId, server);
+   checkFileCountForJob(2, jobId, cache);
+
+   // after releasing once, nothing should change
+   cache.releaseJob(jobId);
+
+   assertEquals(2, checkFilesExist(jobId, keys, cache, 
true));
+   checkFileCountForJob(2, jobId, server);
+  

[jira] [Commented] (FLINK-7056) add API to allow job-related BLOBs to be stored

2017-08-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7056?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16130194#comment-16130194
 ] 

ASF GitHub Bot commented on FLINK-7056:
---

Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/4558#discussion_r133674886
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientTest.java 
---
@@ -139,30 +139,35 @@ private static BlobKey prepareTestFile(File file) 
throws IOException {
 * the specified buffer.
 * 
 * @param inputStream
-*the input stream returned from the GET operation
+*the input stream returned from the GET operation (will be 
closed by this method)
 * @param buf
 *the buffer to compare the input stream's data to
 * @throws IOException
 * thrown if an I/O error occurs while reading the input stream
 */
static void validateGet(final InputStream inputStream, final byte[] 
buf) throws IOException {
--- End diff --

`s/validateGet/validateGetAndClose/g` - to avoid surprises for the function 
users :)


> add API to allow job-related BLOBs to be stored
> ---
>
> Key: FLINK-7056
> URL: https://issues.apache.org/jira/browse/FLINK-7056
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination, Network
>Affects Versions: 1.4.0
>Reporter: Nico Kruber
>Assignee: Nico Kruber
> Fix For: 1.4.0
>
>
> To ease cleanup, we will make job-related BLOBs be reflected in the blob 
> storage so that they may be removed along with the job. This adds the jobId 
> to many methods similar to the previous code from the {{NAME_ADDRESSABLE}} 
> mode.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7057) move BLOB ref-counting from LibraryCacheManager to BlobCache

2017-08-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7057?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16130200#comment-16130200
 ] 

ASF GitHub Bot commented on FLINK-7057:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4238#discussion_r133671238
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java
 ---
@@ -45,6 +42,18 @@
 import java.util.Collections;
 import java.util.List;
 
+import static 
org.apache.flink.runtime.blob.BlobCacheCleanupTest.checkFileCountForJob;
+import static 
org.apache.flink.runtime.blob.BlobCacheCleanupTest.checkFilesExist;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.junit.Assume.assumeTrue;
+
+/**
+ * Tests for {@link BlobLibraryCacheManager}.
+ */
 public class BlobLibraryCacheManagerTest {
--- End diff --

Let's extend from `TestLogger` here.


> move BLOB ref-counting from LibraryCacheManager to BlobCache
> 
>
> Key: FLINK-7057
> URL: https://issues.apache.org/jira/browse/FLINK-7057
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination, Network
>Affects Versions: 1.4.0
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>
> Currently, the {{LibraryCacheManager}} is doing some ref-counting for JAR 
> files managed by it. Instead, we want the {{BlobCache}} to do that itself for 
> all job-related BLOBs. Also, we do not want to operate on a per-{{BlobKey}} 
> level but rather per job. Therefore, the cleanup process should be adapted, 
> too.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7057) move BLOB ref-counting from LibraryCacheManager to BlobCache

2017-08-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7057?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16130201#comment-16130201
 ] 

ASF GitHub Bot commented on FLINK-7057:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4238#discussion_r133673260
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerCleanupITCase.java
 ---
@@ -0,0 +1,298 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmanager;
+
+import akka.actor.ActorSystem;
+import akka.testkit.JavaTestKit;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.AkkaOptions;
+import org.apache.flink.configuration.BlobServerOptions;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.akka.ListeningBehaviour;
+import org.apache.flink.runtime.blob.BlobClient;
+import org.apache.flink.runtime.blob.BlobKey;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.instance.AkkaActorGateway;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.messages.JobManagerMessages;
+import org.apache.flink.runtime.testingUtils.TestingCluster;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.runtime.testtasks.FailingBlockingInvokable;
+import org.apache.flink.runtime.testtasks.NoOpInvokable;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.io.File;
+import java.io.FilenameFilter;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.Arrays;
+
+import static 
org.apache.flink.runtime.testingUtils.TestingUtils.DEFAULT_AKKA_ASK_TIMEOUT;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.fail;
+
+/**
+ * Small test to check that the {@link 
org.apache.flink.runtime.blob.BlobServer} cleanup is executed
+ * after job termination.
+ */
+public class JobManagerCleanupITCase {
--- End diff --

`TestLogger`


> move BLOB ref-counting from LibraryCacheManager to BlobCache
> 
>
> Key: FLINK-7057
> URL: https://issues.apache.org/jira/browse/FLINK-7057
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination, Network
>Affects Versions: 1.4.0
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>
> Currently, the {{LibraryCacheManager}} is doing some ref-counting for JAR 
> files managed by it. Instead, we want the {{BlobCache}} to do that itself for 
> all job-related BLOBs. Also, we do not want to operate on a per-{{BlobKey}} 
> level but rather per job. Therefore, the cleanup process should be adapted, 
> too.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7057) move BLOB ref-counting from LibraryCacheManager to BlobCache

2017-08-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7057?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16130199#comment-16130199
 ] 

ASF GitHub Bot commented on FLINK-7057:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4238#discussion_r133669690
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheCleanupTest.java
 ---
@@ -0,0 +1,324 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.blob;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.BlobServerOptions;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.junit.Ignore;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * A few tests for the deferred ref-counting based cleanup inside the 
{@link BlobCache}.
+ */
+public class BlobCacheCleanupTest {
+
+   @Rule
+   public TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+   /**
+* Tests that {@link BlobCache} cleans up after calling {@link 
BlobCache#releaseJob(JobID)}.
+*/
+   @Test
+   public void testJobCleanup() throws IOException, InterruptedException {
+
+   JobID jobId = new JobID();
+   List keys = new ArrayList();
+   BlobServer server = null;
+   BlobCache cache = null;
+
+   final byte[] buf = new byte[128];
+
+   try {
+   Configuration config = new Configuration();
+   config.setString(BlobServerOptions.STORAGE_DIRECTORY,
+   temporaryFolder.newFolder().getAbsolutePath());
+   config.setLong(BlobServerOptions.CLEANUP_INTERVAL, 1L);
+
+   server = new BlobServer(config, new VoidBlobStore());
+   InetSocketAddress serverAddress = new 
InetSocketAddress("localhost", server.getPort());
+   BlobClient bc = new BlobClient(serverAddress, config);
+   cache = new BlobCache(serverAddress, config, new 
VoidBlobStore());
+
+   keys.add(bc.put(jobId, buf));
+   buf[0] += 1;
+   keys.add(bc.put(jobId, buf));
+
+   bc.close();
--- End diff --

Maybe we could close the client in the finally block as well if something 
goes wrong.


> move BLOB ref-counting from LibraryCacheManager to BlobCache
> 
>
> Key: FLINK-7057
> URL: https://issues.apache.org/jira/browse/FLINK-7057
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination, Network
>Affects Versions: 1.4.0
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>
> Currently, the {{LibraryCacheManager}} is doing some ref-counting for JAR 
> files managed by it. Instead, we want the {{BlobCache}} to do that itself for 
> all job-related BLOBs. Also, we do not want to operate on a per-{{BlobKey}} 
> level but rather per job. Therefore, the cleanup process should be adapted, 
> too.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4238: [FLINK-7057][blob] move BLOB ref-counting from Lib...

2017-08-17 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4238#discussion_r133668912
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheCleanupTest.java
 ---
@@ -0,0 +1,324 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.blob;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.BlobServerOptions;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.junit.Ignore;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * A few tests for the deferred ref-counting based cleanup inside the 
{@link BlobCache}.
+ */
+public class BlobCacheCleanupTest {
--- End diff --

Let's extend from the `TestLogger` here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4238: [FLINK-7057][blob] move BLOB ref-counting from Lib...

2017-08-17 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4238#discussion_r133670563
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheCleanupTest.java
 ---
@@ -0,0 +1,324 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.blob;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.BlobServerOptions;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.junit.Ignore;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * A few tests for the deferred ref-counting based cleanup inside the 
{@link BlobCache}.
+ */
+public class BlobCacheCleanupTest {
+
+   @Rule
+   public TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+   /**
+* Tests that {@link BlobCache} cleans up after calling {@link 
BlobCache#releaseJob(JobID)}.
+*/
+   @Test
+   public void testJobCleanup() throws IOException, InterruptedException {
+
+   JobID jobId = new JobID();
+   List keys = new ArrayList();
+   BlobServer server = null;
+   BlobCache cache = null;
+
+   final byte[] buf = new byte[128];
+
+   try {
+   Configuration config = new Configuration();
+   config.setString(BlobServerOptions.STORAGE_DIRECTORY,
+   temporaryFolder.newFolder().getAbsolutePath());
+   config.setLong(BlobServerOptions.CLEANUP_INTERVAL, 1L);
+
+   server = new BlobServer(config, new VoidBlobStore());
+   InetSocketAddress serverAddress = new 
InetSocketAddress("localhost", server.getPort());
+   BlobClient bc = new BlobClient(serverAddress, config);
+   cache = new BlobCache(serverAddress, config, new 
VoidBlobStore());
+
+   keys.add(bc.put(jobId, buf));
+   buf[0] += 1;
+   keys.add(bc.put(jobId, buf));
+
+   bc.close();
+
+   checkFileCountForJob(2, jobId, server);
+   checkFileCountForJob(0, jobId, cache);
+
+   // register once
+   cache.registerJob(jobId);
+
+   checkFileCountForJob(2, jobId, server);
+   checkFileCountForJob(0, jobId, cache);
+
+   for (BlobKey key : keys) {
+   cache.getFile(jobId, key);
+   }
+
+   // register again (let's say, from another thread or so)
+   cache.registerJob(jobId);
+   for (BlobKey key : keys) {
+   cache.getFile(jobId, key);
+   }
+
+   assertEquals(2, checkFilesExist(jobId, keys, cache, 
true));
+   checkFileCountForJob(2, jobId, server);
+   checkFileCountForJob(2, jobId, cache);
+
+   // after releasing once, nothing should change
+   cache.releaseJob(jobId);
+
+   assertEquals(2, checkFilesExist(jobId, keys, cache, 
true));
+   checkFileCountForJob(2, jobId, server);
+   checkFileCountForJob(2, jobId, cache);
+
+   // after releasing the second time, the job is up for 
deferred cleanup
+   cache.releaseJob(jobId);
+
+   // because we 

[GitHub] flink pull request #4558: [FLINK-7056][tests][hotfix] make sure the client a...

2017-08-17 Thread pnowojski
Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/4558#discussion_r133674886
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientTest.java 
---
@@ -139,30 +139,35 @@ private static BlobKey prepareTestFile(File file) 
throws IOException {
 * the specified buffer.
 * 
 * @param inputStream
-*the input stream returned from the GET operation
+*the input stream returned from the GET operation (will be 
closed by this method)
 * @param buf
 *the buffer to compare the input stream's data to
 * @throws IOException
 * thrown if an I/O error occurs while reading the input stream
 */
static void validateGet(final InputStream inputStream, final byte[] 
buf) throws IOException {
--- End diff --

`s/validateGet/validateGetAndClose/g` - to avoid surprises for the function 
users :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4238: [FLINK-7057][blob] move BLOB ref-counting from Lib...

2017-08-17 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4238#discussion_r133669690
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheCleanupTest.java
 ---
@@ -0,0 +1,324 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.blob;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.BlobServerOptions;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.junit.Ignore;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * A few tests for the deferred ref-counting based cleanup inside the 
{@link BlobCache}.
+ */
+public class BlobCacheCleanupTest {
+
+   @Rule
+   public TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+   /**
+* Tests that {@link BlobCache} cleans up after calling {@link 
BlobCache#releaseJob(JobID)}.
+*/
+   @Test
+   public void testJobCleanup() throws IOException, InterruptedException {
+
+   JobID jobId = new JobID();
+   List keys = new ArrayList();
+   BlobServer server = null;
+   BlobCache cache = null;
+
+   final byte[] buf = new byte[128];
+
+   try {
+   Configuration config = new Configuration();
+   config.setString(BlobServerOptions.STORAGE_DIRECTORY,
+   temporaryFolder.newFolder().getAbsolutePath());
+   config.setLong(BlobServerOptions.CLEANUP_INTERVAL, 1L);
+
+   server = new BlobServer(config, new VoidBlobStore());
+   InetSocketAddress serverAddress = new 
InetSocketAddress("localhost", server.getPort());
+   BlobClient bc = new BlobClient(serverAddress, config);
+   cache = new BlobCache(serverAddress, config, new 
VoidBlobStore());
+
+   keys.add(bc.put(jobId, buf));
+   buf[0] += 1;
+   keys.add(bc.put(jobId, buf));
+
+   bc.close();
--- End diff --

Maybe we could close the client in the finally block as well if something 
goes wrong.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4238: [FLINK-7057][blob] move BLOB ref-counting from Lib...

2017-08-17 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4238#discussion_r133670196
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheCleanupTest.java
 ---
@@ -0,0 +1,324 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.blob;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.BlobServerOptions;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.junit.Ignore;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * A few tests for the deferred ref-counting based cleanup inside the 
{@link BlobCache}.
+ */
+public class BlobCacheCleanupTest {
+
+   @Rule
+   public TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+   /**
+* Tests that {@link BlobCache} cleans up after calling {@link 
BlobCache#releaseJob(JobID)}.
+*/
+   @Test
+   public void testJobCleanup() throws IOException, InterruptedException {
+
+   JobID jobId = new JobID();
+   List keys = new ArrayList();
+   BlobServer server = null;
+   BlobCache cache = null;
+
+   final byte[] buf = new byte[128];
+
+   try {
+   Configuration config = new Configuration();
+   config.setString(BlobServerOptions.STORAGE_DIRECTORY,
+   temporaryFolder.newFolder().getAbsolutePath());
+   config.setLong(BlobServerOptions.CLEANUP_INTERVAL, 1L);
+
+   server = new BlobServer(config, new VoidBlobStore());
+   InetSocketAddress serverAddress = new 
InetSocketAddress("localhost", server.getPort());
+   BlobClient bc = new BlobClient(serverAddress, config);
+   cache = new BlobCache(serverAddress, config, new 
VoidBlobStore());
+
+   keys.add(bc.put(jobId, buf));
+   buf[0] += 1;
+   keys.add(bc.put(jobId, buf));
+
+   bc.close();
+
+   checkFileCountForJob(2, jobId, server);
+   checkFileCountForJob(0, jobId, cache);
+
+   // register once
+   cache.registerJob(jobId);
+
+   checkFileCountForJob(2, jobId, server);
+   checkFileCountForJob(0, jobId, cache);
+
+   for (BlobKey key : keys) {
+   cache.getFile(jobId, key);
+   }
+
+   // register again (let's say, from another thread or so)
+   cache.registerJob(jobId);
+   for (BlobKey key : keys) {
+   cache.getFile(jobId, key);
+   }
+
+   assertEquals(2, checkFilesExist(jobId, keys, cache, 
true));
+   checkFileCountForJob(2, jobId, server);
+   checkFileCountForJob(2, jobId, cache);
+
+   // after releasing once, nothing should change
+   cache.releaseJob(jobId);
+
+   assertEquals(2, checkFilesExist(jobId, keys, cache, 
true));
+   checkFileCountForJob(2, jobId, server);
+   checkFileCountForJob(2, jobId, cache);
+
+   // after releasing the second time, the job is up for 
deferred cleanup
+   cache.releaseJob(jobId);
+
+   // because we 

[GitHub] flink pull request #4238: [FLINK-7057][blob] move BLOB ref-counting from Lib...

2017-08-17 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4238#discussion_r133674703
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/testtasks/FailingBlockingInvokable.java
 ---
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.testtasks;
+
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+
+/**
+ * Task which blocks until the (static) {@link #unblock()} method is 
called and then fails with an
+ * exception.
+ */
+public class FailingBlockingInvokable extends AbstractInvokable {
+   private static boolean blocking = true;
--- End diff --

I think we should make `blocking` `volatile`. Otherwise we might miss 
changes and end up in a deadlock. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4238: [FLINK-7057][blob] move BLOB ref-counting from Lib...

2017-08-17 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4238#discussion_r133671238
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java
 ---
@@ -45,6 +42,18 @@
 import java.util.Collections;
 import java.util.List;
 
+import static 
org.apache.flink.runtime.blob.BlobCacheCleanupTest.checkFileCountForJob;
+import static 
org.apache.flink.runtime.blob.BlobCacheCleanupTest.checkFilesExist;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.junit.Assume.assumeTrue;
+
+/**
+ * Tests for {@link BlobLibraryCacheManager}.
+ */
 public class BlobLibraryCacheManagerTest {
--- End diff --

Let's extend from `TestLogger` here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4238: [FLINK-7057][blob] move BLOB ref-counting from Lib...

2017-08-17 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4238#discussion_r133673260
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerCleanupITCase.java
 ---
@@ -0,0 +1,298 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmanager;
+
+import akka.actor.ActorSystem;
+import akka.testkit.JavaTestKit;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.AkkaOptions;
+import org.apache.flink.configuration.BlobServerOptions;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.akka.ListeningBehaviour;
+import org.apache.flink.runtime.blob.BlobClient;
+import org.apache.flink.runtime.blob.BlobKey;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.instance.AkkaActorGateway;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.messages.JobManagerMessages;
+import org.apache.flink.runtime.testingUtils.TestingCluster;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.runtime.testtasks.FailingBlockingInvokable;
+import org.apache.flink.runtime.testtasks.NoOpInvokable;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.io.File;
+import java.io.FilenameFilter;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.Arrays;
+
+import static 
org.apache.flink.runtime.testingUtils.TestingUtils.DEFAULT_AKKA_ASK_TIMEOUT;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.fail;
+
+/**
+ * Small test to check that the {@link 
org.apache.flink.runtime.blob.BlobServer} cleanup is executed
+ * after job termination.
+ */
+public class JobManagerCleanupITCase {
--- End diff --

`TestLogger`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-7057) move BLOB ref-counting from LibraryCacheManager to BlobCache

2017-08-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7057?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16130190#comment-16130190
 ] 

ASF GitHub Bot commented on FLINK-7057:
---

GitHub user NicoK opened a pull request:

https://github.com/apache/flink/pull/4558

[FLINK-7057][tests][hotfix] make sure the client and a created InputStream 
are closed

## What is the purpose of the change

This fixes some stalling tests in the following cases: if the client is not 
closed and the inputstream of a GET operation was not fully read and the server 
has not yet sent all data packets, it may still hold the read lock and block 
any writing operations (also see FLINK-7467). As a result, a user reported 
`BlobServerDeleteTest#testDeleteSingleByBlobKey()` to hang.

## Brief change log

- (fully read and) close all `InputStream` instances returned by 
`BlobClient#get()`
- add missing `close` calls to `BlobClient` uses in the tests

## Verifying this change

This change is affects and fixes existing tests.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable)



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/NicoK/flink flink-6916-7057-hotfix

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4558.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4558


commit f80025773b071a67fdcbe612d58ee16ee7cbbc52
Author: Nico Kruber 
Date:   2017-08-17T10:04:09Z

[FLINK-7057][tests][hotfix] make sure the client and a created InputStream 
are closed

If not and the server has not yet sent all data packets, it may still 
occupy the
read lock and block any writing operations (also see FLINK-7467).




> move BLOB ref-counting from LibraryCacheManager to BlobCache
> 
>
> Key: FLINK-7057
> URL: https://issues.apache.org/jira/browse/FLINK-7057
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination, Network
>Affects Versions: 1.4.0
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>
> Currently, the {{LibraryCacheManager}} is doing some ref-counting for JAR 
> files managed by it. Instead, we want the {{BlobCache}} to do that itself for 
> all job-related BLOBs. Also, we do not want to operate on a per-{{BlobKey}} 
> level but rather per job. Therefore, the cleanup process should be adapted, 
> too.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4558: [FLINK-7057][tests][hotfix] make sure the client a...

2017-08-17 Thread NicoK
GitHub user NicoK opened a pull request:

https://github.com/apache/flink/pull/4558

[FLINK-7057][tests][hotfix] make sure the client and a created InputStream 
are closed

## What is the purpose of the change

This fixes some stalling tests in the following cases: if the client is not 
closed and the inputstream of a GET operation was not fully read and the server 
has not yet sent all data packets, it may still hold the read lock and block 
any writing operations (also see FLINK-7467). As a result, a user reported 
`BlobServerDeleteTest#testDeleteSingleByBlobKey()` to hang.

## Brief change log

- (fully read and) close all `InputStream` instances returned by 
`BlobClient#get()`
- add missing `close` calls to `BlobClient` uses in the tests

## Verifying this change

This change is affects and fixes existing tests.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable)



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/NicoK/flink flink-6916-7057-hotfix

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4558.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4558


commit f80025773b071a67fdcbe612d58ee16ee7cbbc52
Author: Nico Kruber 
Date:   2017-08-17T10:04:09Z

[FLINK-7057][tests][hotfix] make sure the client and a created InputStream 
are closed

If not and the server has not yet sent all data packets, it may still 
occupy the
read lock and block any writing operations (also see FLINK-7467).




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Created] (FLINK-7467) a slow client on the BlobServer may block writing operations

2017-08-17 Thread Nico Kruber (JIRA)
Nico Kruber created FLINK-7467:
--

 Summary: a slow client on the BlobServer may block writing 
operations
 Key: FLINK-7467
 URL: https://issues.apache.org/jira/browse/FLINK-7467
 Project: Flink
  Issue Type: Bug
  Components: Distributed Coordination, Network
Affects Versions: 1.3.2, 1.3.1, 1.3.0, 1.4.0
Reporter: Nico Kruber
Assignee: Nico Kruber


Since FLINK-6020, a locking mechanism was introduced to isolate reading from 
writing operations. This, however, will cause a slow client to slow down all 
write operations and may not be desirable at this level.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7169) Support AFTER MATCH SKIP function in CEP library API

2017-08-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16130168#comment-16130168
 ] 

ASF GitHub Bot commented on FLINK-7169:
---

Github user dawidwys commented on a diff in the pull request:

https://github.com/apache/flink/pull/4331#discussion_r133667042
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java ---
@@ -340,6 +362,65 @@ public void resetNFAChanged() {
return Tuple2.of(result, timeoutResult);
}
 
+   private void 
discardComputationStatesAccordingToStrategy(Queue 
computationStates,
--- End diff --

The changes in the next commit are not enough. The `Map` type should be 
also changed in `SharedBuffer#extractPatterns`. Also please provide tests for 
this behaviour (returning results in Pattern order), so that it will not be 
possible to change it by mistake.


> Support AFTER MATCH SKIP function in CEP library API
> 
>
> Key: FLINK-7169
> URL: https://issues.apache.org/jira/browse/FLINK-7169
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP
>Reporter: Yueting Chen
>Assignee: Yueting Chen
> Fix For: 1.4.0
>
>
> In order to support Oracle's MATCH_RECOGNIZE on top of the CEP library, we 
> need to support AFTER MATCH SKIP function in CEP API.
> There're four options in AFTER MATCH SKIP, listed as follows:
> 1. AFTER MATCH SKIP TO NEXT ROW: resume pattern matching at the row after the 
> first row of the current match.
> 2. AFTER MATCH SKIP PAST LAST ROW: resume pattern matching at the next row 
> after the last row of the current match.
> 3. AFTER MATCH SKIP TO FIST *RPV*: resume pattern matching at the first row 
> that is mapped to the row pattern variable RPV.
> 4. AFTER MATCH SKIP TO LAST *RPV*: resume pattern matching at the last row 
> that is mapped to the row pattern variable RPV.
> I think we can introduce a new function to `CEP` class, which takes a new 
> parameter as AfterMatchSKipStrategy.
> The new API may looks like this
> {code}
> public static  PatternStream pattern(DataStream input, Pattern 
> pattern, AfterMatchSkipStrategy afterMatchSkipStrategy) 
> {code}
> We can also make `SKIP TO NEXT ROW` as the default option, because that's 
> what CEP library behaves currently.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-7466) Add a flink connector for Apache RocketMQ

2017-08-17 Thread yukon (JIRA)
yukon created FLINK-7466:


 Summary: Add a flink connector for Apache RocketMQ
 Key: FLINK-7466
 URL: https://issues.apache.org/jira/browse/FLINK-7466
 Project: Flink
  Issue Type: New Feature
Reporter: yukon


Hi Flink community:

Flink is really a great stream processing framework, I would like to contribute 
a flink-rocketmq-connector, if you think it's acceptable, I will submit a pull 
request soon.

Apache RocketMQ is a distributed messaging and streaming platform with low 
latency, high performance and reliability, trillion-level capacity and flexible 
scalability. More info please refer to http://rocketmq.apache.org/



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4331: [FLINK-7169][CEP] Support AFTER MATCH SKIP functio...

2017-08-17 Thread dawidwys
Github user dawidwys commented on a diff in the pull request:

https://github.com/apache/flink/pull/4331#discussion_r133667042
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java ---
@@ -340,6 +362,65 @@ public void resetNFAChanged() {
return Tuple2.of(result, timeoutResult);
}
 
+   private void 
discardComputationStatesAccordingToStrategy(Queue 
computationStates,
--- End diff --

The changes in the next commit are not enough. The `Map` type should be 
also changed in `SharedBuffer#extractPatterns`. Also please provide tests for 
this behaviour (returning results in Pattern order), so that it will not be 
possible to change it by mistake.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-7461) Remove Backwards compatibility for Flink 1.1 from Flink 1.4

2017-08-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7461?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16130146#comment-16130146
 ] 

ASF GitHub Bot commented on FLINK-7461:
---

Github user StefanRRichter commented on the issue:

https://github.com/apache/flink/pull/4550
  
I have pushed an updated version with the changes that @zentol suggested 
and the additional cleanups suggested by @dawidwys.

@dawidwys I removed the code that you mentioned, but it is still possible 
that there more dead code in CEP (or even other places).


> Remove Backwards compatibility for Flink 1.1 from Flink 1.4
> ---
>
> Key: FLINK-7461
> URL: https://issues.apache.org/jira/browse/FLINK-7461
> Project: Flink
>  Issue Type: Improvement
>Affects Versions: 1.4.0
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>
> This issue tracks the removal of Flink 1.1 backwards compatibility from Flink 
> 1.4. This step is helpful for further developments because it will remove 
> many old code paths and special cases. In particular, we can drop all 
> handling for non-partitionable state, i.e. state that was created with the 
> old {{Checkpointed}} interface.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4550: [FLINK-7461] Remove Backwards compatibility with <= Flink...

2017-08-17 Thread StefanRRichter
Github user StefanRRichter commented on the issue:

https://github.com/apache/flink/pull/4550
  
I have pushed an updated version with the changes that @zentol suggested 
and the additional cleanups suggested by @dawidwys.

@dawidwys I removed the code that you mentioned, but it is still possible 
that there more dead code in CEP (or even other places).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-7461) Remove Backwards compatibility for Flink 1.1 from Flink 1.4

2017-08-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7461?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16130141#comment-16130141
 ] 

ASF GitHub Bot commented on FLINK-7461:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/4550#discussion_r133663601
  
--- Diff: 
flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java
 ---
@@ -122,6 +124,22 @@ protected RocksDBStateBackend getStateBackend() throws 
IOException {
return backend;
}
 
+   // small safety net for instance cleanups, so that no native objects 
are left
--- End diff --

This change actually belongs to the previous commit, so I will put it back 
there.


> Remove Backwards compatibility for Flink 1.1 from Flink 1.4
> ---
>
> Key: FLINK-7461
> URL: https://issues.apache.org/jira/browse/FLINK-7461
> Project: Flink
>  Issue Type: Improvement
>Affects Versions: 1.4.0
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>
> This issue tracks the removal of Flink 1.1 backwards compatibility from Flink 
> 1.4. This step is helpful for further developments because it will remove 
> many old code paths and special cases. In particular, we can drop all 
> handling for non-partitionable state, i.e. state that was created with the 
> old {{Checkpointed}} interface.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4550: [FLINK-7461] Remove Backwards compatibility with <...

2017-08-17 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/4550#discussion_r133663601
  
--- Diff: 
flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java
 ---
@@ -122,6 +124,22 @@ protected RocksDBStateBackend getStateBackend() throws 
IOException {
return backend;
}
 
+   // small safety net for instance cleanups, so that no native objects 
are left
--- End diff --

This change actually belongs to the previous commit, so I will put it back 
there.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-7461) Remove Backwards compatibility for Flink 1.1 from Flink 1.4

2017-08-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7461?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16130142#comment-16130142
 ] 

ASF GitHub Bot commented on FLINK-7461:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/4550#discussion_r133663689
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/StateUtil.java ---
@@ -49,27 +49,8 @@ public static long getStateSize(StateObject handle) {
 * @throws Exception exception that is a collection of all suppressed 
exceptions that were caught during iteration
 */
public static void bestEffortDiscardAllStateObjects(
-   Iterable handlesToDiscard) 
throws Exception {
-
-   if (handlesToDiscard != null) {
-   Exception exception = null;
-
-   for (StateObject state : handlesToDiscard) {
-
-   if (state != null) {
-   try {
-   state.discardState();
-   }
-   catch (Exception ex) {
-   exception = 
ExceptionUtils.firstOrSuppressed(ex, exception);
-   }
-   }
-   }
-
-   if (exception != null) {
-   throw exception;
-   }
-   }
+   Iterable handlesToDiscard) throws 
Exception {
+   
LambdaUtil.applyToAllWhileSuppressingExceptions(handlesToDiscard, 
StateObject::discardState);
--- End diff --

Not strictly, but I would like to keep it.


> Remove Backwards compatibility for Flink 1.1 from Flink 1.4
> ---
>
> Key: FLINK-7461
> URL: https://issues.apache.org/jira/browse/FLINK-7461
> Project: Flink
>  Issue Type: Improvement
>Affects Versions: 1.4.0
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>
> This issue tracks the removal of Flink 1.1 backwards compatibility from Flink 
> 1.4. This step is helpful for further developments because it will remove 
> many old code paths and special cases. In particular, we can drop all 
> handling for non-partitionable state, i.e. state that was created with the 
> old {{Checkpointed}} interface.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


  1   2   >