[jira] [Commented] (FLINK-2566) FlinkTopologyContext not populated completely

2015-09-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2566?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14746405#comment-14746405
 ] 

ASF GitHub Bot commented on FLINK-2566:
---

GitHub user mjsax opened a pull request:

https://github.com/apache/flink/pull/1135

[FLINK-2566] FlinkTopologyContext not populated completely

  - extended FlinkTopologyContext to be populted with all supportable 
attributes
  - added JUnit test
  - updated README.md
additionally: module restructuring to get cleaner package structure

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/mjsax/flink flink-2566-topologyContext

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/1135.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1135


commit 07473990fcbe06cff2305afcd78d5aef74895736
Author: mjsax 
Date:   2015-09-15T21:59:31Z

[FLINK-2566] FlinkTopologyContext not populated completely
  - extended FlinkTopologyContext to be populted with all supportable 
attributes
  - added JUnit test
  - updated README.md
additionally: module restructuring to get cleaner package structure




> FlinkTopologyContext not populated completely
> -
>
> Key: FLINK-2566
> URL: https://issues.apache.org/jira/browse/FLINK-2566
> Project: Flink
>  Issue Type: Improvement
>  Components: Storm Compatibility
>Reporter: Matthias J. Sax
>Assignee: Matthias J. Sax
>Priority: Minor
>
> Currently FlinkTopologyContext is not populated completely. It only contains 
> enough information to make WordCount example work.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2566] FlinkTopologyContext not populate...

2015-09-15 Thread mjsax
GitHub user mjsax opened a pull request:

https://github.com/apache/flink/pull/1135

[FLINK-2566] FlinkTopologyContext not populated completely

  - extended FlinkTopologyContext to be populted with all supportable 
attributes
  - added JUnit test
  - updated README.md
additionally: module restructuring to get cleaner package structure

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/mjsax/flink flink-2566-topologyContext

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/1135.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1135


commit 07473990fcbe06cff2305afcd78d5aef74895736
Author: mjsax 
Date:   2015-09-15T21:59:31Z

[FLINK-2566] FlinkTopologyContext not populated completely
  - extended FlinkTopologyContext to be populted with all supportable 
attributes
  - added JUnit test
  - updated README.md
additionally: module restructuring to get cleaner package structure




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: Hits

2015-09-15 Thread mfahimazizi
Github user mfahimazizi commented on the pull request:

https://github.com/apache/flink/pull/765#issuecomment-140633835
  
Hi @vasia, Thank you.
for this algorithm, we need to calculate edges values inside  
VertexUpdateFunction, so I will try to build my own delta iteration. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Updated] (FLINK-2595) Unclosed JarFile may leak resource in ClassLoaderUtilsTest

2015-09-15 Thread Ted Yu (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-2595?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ted Yu updated FLINK-2595:
--
Description: 
Here is related code:
{code}
try {
new JarFile(validJar.getAbsolutePath());
}
catch (Exception e) {
e.printStackTrace();
fail("test setup broken: cannot create a valid 
jar file");
}
{code}
When exception happens, the JarFile instance is not closed.

  was:
Here is related code:
{code}
try {
new JarFile(validJar.getAbsolutePath());
}
catch (Exception e) {
e.printStackTrace();
fail("test setup broken: cannot create a valid 
jar file");
}
{code}

When exception happens, the JarFile instance is not closed.


> Unclosed JarFile may leak resource in ClassLoaderUtilsTest
> --
>
> Key: FLINK-2595
> URL: https://issues.apache.org/jira/browse/FLINK-2595
> Project: Flink
>  Issue Type: Test
>Reporter: Ted Yu
>Priority: Minor
>
> Here is related code:
> {code}
> try {
> new JarFile(validJar.getAbsolutePath());
> }
> catch (Exception e) {
> e.printStackTrace();
> fail("test setup broken: cannot create a 
> valid jar file");
> }
> {code}
> When exception happens, the JarFile instance is not closed.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-2649) Potential resource leak in JarHelper#unjar()

2015-09-15 Thread Ted Yu (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-2649?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ted Yu updated FLINK-2649:
--
Description: 
{code}
dest = new BufferedOutputStream(fos, BUFFER_SIZE);
while ((count = jis.read(data, 0, BUFFER_SIZE)) != -1) {
dest.write(data, 0, count);
}
dest.flush();
dest.close();
{code}

The close() of dest should be enclosed in finally block.

  was:
{code}
dest = new BufferedOutputStream(fos, BUFFER_SIZE);
while ((count = jis.read(data, 0, BUFFER_SIZE)) != -1) {
dest.write(data, 0, count);
}
dest.flush();
dest.close();
{code}
The close() of dest should be enclosed in finally block.


> Potential resource leak in JarHelper#unjar()
> 
>
> Key: FLINK-2649
> URL: https://issues.apache.org/jira/browse/FLINK-2649
> Project: Flink
>  Issue Type: Bug
>Reporter: Ted Yu
>
> {code}
> dest = new BufferedOutputStream(fos, BUFFER_SIZE);
> while ((count = jis.read(data, 0, BUFFER_SIZE)) != -1) {
> dest.write(data, 0, count);
> }
> dest.flush();
> dest.close();
> {code}
> The close() of dest should be enclosed in finally block.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-2566) FlinkTopologyContext not populated completely

2015-09-15 Thread Matthias J. Sax (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-2566?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated FLINK-2566:
---
Summary: FlinkTopologyContext not populated completely  (was: 
FlinkTopologyContext no populated completely)

> FlinkTopologyContext not populated completely
> -
>
> Key: FLINK-2566
> URL: https://issues.apache.org/jira/browse/FLINK-2566
> Project: Flink
>  Issue Type: Improvement
>  Components: Storm Compatibility
>Reporter: Matthias J. Sax
>Assignee: Matthias J. Sax
>Priority: Minor
>
> Currently FlinkTopologyContext is not populated completely. It only contains 
> enough information to make WordCount example work.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-1745) Add exact k-nearest-neighbours algorithm to machine learning library

2015-09-15 Thread Daniel Blazevski (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1745?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14746552#comment-14746552
 ] 

Daniel Blazevski commented on FLINK-1745:
-

Thanks!

Pulled and built Chinwan's branch, and further modified very minor changes
to his test to familiarize myself more with DataSets.

Dan


On Tue, Sep 15, 2015 at 4:47 AM, Till Rohrmann (JIRA) 



> Add exact k-nearest-neighbours algorithm to machine learning library
> 
>
> Key: FLINK-1745
> URL: https://issues.apache.org/jira/browse/FLINK-1745
> Project: Flink
>  Issue Type: New Feature
>  Components: Machine Learning Library
>Reporter: Till Rohrmann
>Assignee: Daniel Blazevski
>  Labels: ML, Starter
>
> Even though the k-nearest-neighbours (kNN) [1,2] algorithm is quite trivial 
> it is still used as a mean to classify data and to do regression. This issue 
> focuses on the implementation of an exact kNN (H-BNLJ, H-BRJ) algorithm as 
> proposed in [2].
> Could be a starter task.
> Resources:
> [1] [http://en.wikipedia.org/wiki/K-nearest_neighbors_algorithm]
> [2] [https://www.cs.utah.edu/~lifeifei/papers/mrknnj.pdf]



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2669) Execution contains non-serializable field

2015-09-15 Thread Stephan Ewen (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2669?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14744914#comment-14744914
 ] 

Stephan Ewen commented on FLINK-2669:
-

The seializability of the execution graph is tricky right now.

It is properly serializable after calling {{prepareForArchiving()}}, and can be 
stored serialized. It is not serializable before.

> Execution contains non-serializable field
> -
>
> Key: FLINK-2669
> URL: https://issues.apache.org/jira/browse/FLINK-2669
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Runtime
>Affects Versions: master, 0.9.1
>Reporter: Ufuk Celebi
>Priority: Minor
>
> Execution is {{Serializable}}, but non-transient field {{assignedResource}} 
> of type {{SimpleSlot}} is not.
> I've noticed this while sending a {{JobManagerMessages.RequestJob}} to a job 
> manager, which returns the {{ExecutionGraph}}.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2669) Execution contains non-serializable field

2015-09-15 Thread Stephan Ewen (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2669?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14744915#comment-14744915
 ] 

Stephan Ewen commented on FLINK-2669:
-

This is by (admittedly somewhat tricky) design...

> Execution contains non-serializable field
> -
>
> Key: FLINK-2669
> URL: https://issues.apache.org/jira/browse/FLINK-2669
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Runtime
>Affects Versions: master, 0.9.1
>Reporter: Ufuk Celebi
>Priority: Minor
>
> Execution is {{Serializable}}, but non-transient field {{assignedResource}} 
> of type {{SimpleSlot}} is not.
> I've noticed this while sending a {{JobManagerMessages.RequestJob}} to a job 
> manager, which returns the {{ExecutionGraph}}.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-2670) Instable Test S3FileSystemTest

2015-09-15 Thread Matthias J. Sax (JIRA)
Matthias J. Sax created FLINK-2670:
--

 Summary: Instable Test S3FileSystemTest
 Key: FLINK-2670
 URL: https://issues.apache.org/jira/browse/FLINK-2670
 Project: Flink
  Issue Type: Bug
  Components: Tests
Reporter: Matthias J. Sax
Priority: Critical


Fails with
{noformat}
==
Maven produced no output for 300 seconds.
==
{noformat}

https://travis-ci.org/apache/flink/jobs/80344487



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2111) Add "stop" signal to cleanly shutdown streaming jobs

2015-09-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2111?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14744974#comment-14744974
 ] 

ASF GitHub Bot commented on FLINK-2111:
---

Github user mjsax commented on the pull request:

https://github.com/apache/flink/pull/750#issuecomment-140301180
  
Hi, I cleand-up all old commits, and put a new commit on top introducing 
`SourceFunction.stop()` and unblock stop signal using an own thread. Please 
give feedback.

Btw: Travis fails due to unstable test. My own Travis is green: 
https://travis-ci.org/mjsax/flink/builds/80344479


> Add "stop" signal to cleanly shutdown streaming jobs
> 
>
> Key: FLINK-2111
> URL: https://issues.apache.org/jira/browse/FLINK-2111
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Runtime, JobManager, Local Runtime, 
> Streaming, TaskManager, Webfrontend
>Reporter: Matthias J. Sax
>Assignee: Matthias J. Sax
>Priority: Minor
>
> Currently, streaming jobs can only be stopped using "cancel" command, what is 
> a "hard" stop with no clean shutdown.
> The new introduced "stop" signal, will only affect streaming source tasks 
> such that the sources can stop emitting data and shutdown cleanly, resulting 
> in a clean shutdown of the whole streaming job.
> This feature is a pre-requirment for 
> https://issues.apache.org/jira/browse/FLINK-1929



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...

2015-09-15 Thread mjsax
Github user mjsax commented on the pull request:

https://github.com/apache/flink/pull/750#issuecomment-140301180
  
Hi, I cleand-up all old commits, and put a new commit on top introducing 
`SourceFunction.stop()` and unblock stop signal using an own thread. Please 
give feedback.

Btw: Travis fails due to unstable test. My own Travis is green: 
https://travis-ci.org/mjsax/flink/builds/80344479


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: Getter for wrapped StreamExecutionEnvironment ...

2015-09-15 Thread lofifnc
Github user lofifnc commented on the pull request:

https://github.com/apache/flink/pull/1120#issuecomment-140301283
  
Hey,

This is mostly for testing purposes. I'm working on some Tooling, which 
takes a  `StreamExectutionEnvironment` and performs integration Tests. In the 
current state I'm not able to work with a `StreamExecutionEnvironment` defined 
with the Scala api.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2525]Add configuration support in Storm...

2015-09-15 Thread StephanEwen
Github user StephanEwen commented on the pull request:

https://github.com/apache/flink/pull/1046#issuecomment-140291773
  
As per discussion on the dev list, the `ExecuionConfig` has the 
`GlobalJobParameters`, which are useful if one type of config is used across 
all operators.

If each of the operators needs its own config, can you create an abstract 
base class for the storm functions which takes a configuration as an argumen?

BTW: There is no plan to remove the `withParameters()` method in the batch 
API. It is just not the encouraged mechanism any more...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2525) Add configuration support in Storm-compatibility

2015-09-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2525?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14744911#comment-14744911
 ] 

ASF GitHub Bot commented on FLINK-2525:
---

Github user StephanEwen commented on the pull request:

https://github.com/apache/flink/pull/1046#issuecomment-140291773
  
As per discussion on the dev list, the `ExecuionConfig` has the 
`GlobalJobParameters`, which are useful if one type of config is used across 
all operators.

If each of the operators needs its own config, can you create an abstract 
base class for the storm functions which takes a configuration as an argumen?

BTW: There is no plan to remove the `withParameters()` method in the batch 
API. It is just not the encouraged mechanism any more...


> Add configuration support in Storm-compatibility
> 
>
> Key: FLINK-2525
> URL: https://issues.apache.org/jira/browse/FLINK-2525
> Project: Flink
>  Issue Type: New Feature
>  Components: Storm Compatibility
>Reporter: fangfengbin
>Assignee: fangfengbin
>
> Spouts and Bolt are initialized by a call to `Spout.open(...)` and 
> `Bolt.prepare()`, respectively. Both methods have a config `Map` as first 
> parameter. This map is currently not populated. Thus, Spouts and Bolts cannot 
> be configure with user defined parameters. In order to support this feature, 
> spout and bolt wrapper classes need to be extended to create a proper `Map` 
> object. Furthermore, the clients need to be extended to take a `Map`, 
> translate it into a Flink `Configuration` that is forwarded to the wrappers 
> for proper initialization of the map.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [hotfix][Table API tests]add toDataSet in tabl...

2015-09-15 Thread HuangWHWHW
GitHub user HuangWHWHW opened a pull request:

https://github.com/apache/flink/pull/1131

[hotfix][Table API tests]add toDataSet in table API tests

See the discussion in the PR:https://github.com/apache/flink/pull/1098 for 
detail.
And I'm not sure this change need a JIRA or just take a hotfix.

The problem is due to the ambiguous Table APIs between DataSet.scala and 
DataStream.scala.
The class Table can call methods in both DataSet.scala and DataStream.scala.
The class Table need to specify `toDataSet` or `toDataStream` firstly when 
there are the same methods in both DataSet.scala and DataStream.scala.
So I added the `toDataSet`(since there is only one method in DataSet.scala 
recently but further will be another in DataStream.scala) in these Table API 
tests for the follow-up working in DataStream.scala.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/HuangWHWHW/flink FLINK-2622-toDataSet

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/1131.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1131


commit 62741ca7cfe6620d94717fe9eb6909e34da763d0
Author: HuangWHWHW <404823...@qq.com>
Date:   2015-09-15T06:28:17Z

[hotfix][Table API tests]add toDataSet in table API tests




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2525) Add configuration support in Storm-compatibility

2015-09-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2525?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14744978#comment-14744978
 ] 

ASF GitHub Bot commented on FLINK-2525:
---

Github user mjsax commented on the pull request:

https://github.com/apache/flink/pull/1046#issuecomment-140301378
  
Storm only supports one global configuration that is shared over all 
spout/bolts. So `GlobalJobParameter` will work just fine.


> Add configuration support in Storm-compatibility
> 
>
> Key: FLINK-2525
> URL: https://issues.apache.org/jira/browse/FLINK-2525
> Project: Flink
>  Issue Type: New Feature
>  Components: Storm Compatibility
>Reporter: fangfengbin
>Assignee: fangfengbin
>
> Spouts and Bolt are initialized by a call to `Spout.open(...)` and 
> `Bolt.prepare()`, respectively. Both methods have a config `Map` as first 
> parameter. This map is currently not populated. Thus, Spouts and Bolts cannot 
> be configure with user defined parameters. In order to support this feature, 
> spout and bolt wrapper classes need to be extended to create a proper `Map` 
> object. Furthermore, the clients need to be extended to take a `Map`, 
> translate it into a Flink `Configuration` that is forwarded to the wrappers 
> for proper initialization of the map.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2525]Add configuration support in Storm...

2015-09-15 Thread mjsax
Github user mjsax commented on the pull request:

https://github.com/apache/flink/pull/1046#issuecomment-140301378
  
Storm only supports one global configuration that is shared over all 
spout/bolts. So `GlobalJobParameter` will work just fine.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Closed] (FLINK-2669) Execution contains non-serializable field

2015-09-15 Thread Ufuk Celebi (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-2669?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ufuk Celebi closed FLINK-2669.
--
Resolution: Won't Fix

Yeah, I agree and I didn't mean to "fix" this (that's also why I've labelled it 
as minor). I saw that request job is just used for the memory archivist. And it 
was actually not what I needed. I know that there is nothing we can do about it 
at this point.

> Execution contains non-serializable field
> -
>
> Key: FLINK-2669
> URL: https://issues.apache.org/jira/browse/FLINK-2669
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Runtime
>Affects Versions: master, 0.9.1
>Reporter: Ufuk Celebi
>Priority: Minor
>
> Execution is {{Serializable}}, but non-transient field {{assignedResource}} 
> of type {{SimpleSlot}} is not.
> I've noticed this while sending a {{JobManagerMessages.RequestJob}} to a job 
> manager, which returns the {{ExecutionGraph}}.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-2671) Instable Test StreamCheckpointNotifierITCase

2015-09-15 Thread Matthias J. Sax (JIRA)
Matthias J. Sax created FLINK-2671:
--

 Summary: Instable Test StreamCheckpointNotifierITCase
 Key: FLINK-2671
 URL: https://issues.apache.org/jira/browse/FLINK-2671
 Project: Flink
  Issue Type: Bug
  Components: Tests
Reporter: Matthias J. Sax
Priority: Critical


{noformat}
Failed tests: 
  
StreamCheckpointNotifierITCase>StreamFaultToleranceTestBase.runCheckpointedProgram:105->postSubmit:115
 No checkpoint notification was received.{noformat}

https://travis-ci.org/apache/flink/jobs/80344489



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: Hits

2015-09-15 Thread vasia
Github user vasia commented on the pull request:

https://github.com/apache/flink/pull/765#issuecomment-140314739
  
Hi @mfahimazizi,

as I said, it is not possible to access the edge values inside the 
`VertexUpdateFunction`.
However, you can get the same functionality if you add the edge value 
inside the message that you create in the `MessagingFunction`. Alternatively, 
you can build your own delta iteration, instead of using the vertex-centric 
model.
If you're planning to finish this PR, then let us know and we can even sync 
on skype or so to help you out! Otherwise, please close this PR and hopefully 
someone else will pick up this issue.
Thank you!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2637) Add abstract equals, hashCode and toString methods to TypeInformation

2015-09-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2637?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14745761#comment-14745761
 ] 

ASF GitHub Bot commented on FLINK-2637:
---

GitHub user tillrohrmann opened a pull request:

https://github.com/apache/flink/pull/1134

[FLINK-2637] [api-breaking] [scala, types] Adds equals and hashCode method 
to TypeInformations and TypeSerializers

Adds abstract `equals`, `hashCode`, `canEqual` and `toString` methods to 
`TypeInformation`. Adds missing implementations to subtypes. The 
`canEqual(Object obj)` method returns true iff the `obj` can be equaled with 
the called instance.

Adds abstract `equals`, `hashCode` and `canEqual` methods to 
`TypeSerializer`.

Makes `CompositeType` subtypes serializable by removing non-serializable 
fields which were only used for the comparator construction. The comparator 
construction is now realized within a builder object which keeps the 
intermediate state. Consequently, the PR #943 is now obsolete and can be 
closed.  

Refactors the `ObjectArrayTypeInfo` so that the type extraction logic now 
happens within the `TypeExtractor` and no longer in the `TypeInformation` 
subtype.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tillrohrmann/flink fixOptionTypeInfo

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/1134.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1134


commit 35a18b3f5148ec3fdddc318ea4d5971427fffda3
Author: Till Rohrmann 
Date:   2015-09-07T23:12:09Z

[FLINK-2637] [api-breaking] [scala, types] Adds equals and hashCode method 
to TypeInformations and TypeSerializers. Fixes ObjectArrayTypeInfo. Makes 
CompositeTypes serializable.

Adds test for equality relation's symmetric property




> Add abstract equals, hashCode and toString methods to TypeInformation
> -
>
> Key: FLINK-2637
> URL: https://issues.apache.org/jira/browse/FLINK-2637
> Project: Flink
>  Issue Type: Improvement
>  Components: Core
>Affects Versions: 0.9, 0.10
>Reporter: Fabian Hueske
>Assignee: Till Rohrmann
>  Labels: starter
> Fix For: 0.10
>
>
> Flink expects that implementations of {{TypeInformation}} have valid 
> implementations of {{hashCode}} and {{equals}}. However, the API does not 
> enforce to implement these methods. Hence, this is a common origin for bugs 
> such as for example FLINK-2633.
> This can be avoided by adding abstract {{hashCode}} and {{equals}} methods to 
> TypeInformation. An abstract {{toString}} method could also be added.
> This change will brake the API and require to fix a couple of broken 
> {{TypeInformation}} implementations.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2410) PojoTypeInfo is not completely serializable

2015-09-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2410?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14745762#comment-14745762
 ] 

ASF GitHub Bot commented on FLINK-2410:
---

Github user tillrohrmann commented on the pull request:

https://github.com/apache/flink/pull/943#issuecomment-140468581
  
The non-serializable problem of the `PojoTypeInfo` has been fixed in #1134. 
Thus, the PR can be closed.


> PojoTypeInfo is not completely serializable
> ---
>
> Key: FLINK-2410
> URL: https://issues.apache.org/jira/browse/FLINK-2410
> Project: Flink
>  Issue Type: Bug
>  Components: Java API
>Reporter: Timo Walther
>Assignee: Timo Walther
>
> Table API requires PojoTypeInfo to be serializable. The following code fails:
> {code}
> Table finishedEtlTable = maxMeasurements
> .join(stationTable).where("s_station_id = m_station_id")
> .select("year, month, day, value, country, name");
> DataSet maxTemp = tableEnv.toDataSet(finishedEtlTable, 
> MaxTemperature.class);
> maxTemp
> .groupBy("year")
> .sortGroup("value", Order.DESCENDING)
> .first(1)
> .print();
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-1745) Add exact k-nearest-neighbours algorithm to machine learning library

2015-09-15 Thread Chiwan Park (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1745?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14745055#comment-14745055
 ] 

Chiwan Park commented on FLINK-1745:


[~till.rohrmann] Could you assign this issue to [~danielblazevski]? I can't 
find him in assignee list.

> Add exact k-nearest-neighbours algorithm to machine learning library
> 
>
> Key: FLINK-1745
> URL: https://issues.apache.org/jira/browse/FLINK-1745
> Project: Flink
>  Issue Type: New Feature
>  Components: Machine Learning Library
>Reporter: Till Rohrmann
>  Labels: ML, Starter
>
> Even though the k-nearest-neighbours (kNN) [1,2] algorithm is quite trivial 
> it is still used as a mean to classify data and to do regression. This issue 
> focuses on the implementation of an exact kNN (H-BNLJ, H-BRJ) algorithm as 
> proposed in [2].
> Could be a starter task.
> Resources:
> [1] [http://en.wikipedia.org/wiki/K-nearest_neighbors_algorithm]
> [2] [https://www.cs.utah.edu/~lifeifei/papers/mrknnj.pdf]



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-2672) Add partitioned output format to HDFS RollingSink

2015-09-15 Thread Mohamed Amine ABDESSEMED (JIRA)
Mohamed Amine ABDESSEMED created FLINK-2672:
---

 Summary: Add partitioned output format to HDFS RollingSink
 Key: FLINK-2672
 URL: https://issues.apache.org/jira/browse/FLINK-2672
 Project: Flink
  Issue Type: Improvement
  Components: Streaming Connectors
Affects Versions: 0.10
Reporter: Mohamed Amine ABDESSEMED
Priority: Minor


An interesting use case of the HDFS Sink is to dispatch data into multiple 
directories depending of attributes present in the source data.
For example, for some data with a timestamp and a status fields, we want to 
write it into different directories using a pattern like : 
/somepath/%{timestamp}/%{status}

The expected results are somethings like: 
/somepath/some_timestamp/wellformed
/somepath/some_timestamp/malformed
/somepath/some_timestamp/incomplete 
... 
etc

To support this functionality the bucketing and checkpointing logics need to be 
changed. 

Note: For now, this can be done using the current version of the Rolling HDFS 
Sink with the help of splitting data streams and having multiple HDFS sinks.




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2167) Add fromHCat() to TableEnvironment

2015-09-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2167?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14745078#comment-14745078
 ] 

ASF GitHub Bot commented on FLINK-2167:
---

Github user chiwanpark commented on the pull request:

https://github.com/apache/flink/pull/1127#issuecomment-140327645
  
Hi @twalthr, thanks for your contribution. But this PR contains many 
changes unrelated to HCatalog format. Maybe we should split this PR into 
HCatalog and other changes.


> Add fromHCat() to TableEnvironment
> --
>
> Key: FLINK-2167
> URL: https://issues.apache.org/jira/browse/FLINK-2167
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API
>Affects Versions: 0.9
>Reporter: Fabian Hueske
>Assignee: Timo Walther
>Priority: Minor
>  Labels: starter
>
> Add a {{fromHCat()}} method to the {{TableEnvironment}} to read a {{Table}} 
> from an HCatalog table.
> The implementation could reuse Flink's HCatInputFormat.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2167] [table] Add fromHCat() to TableEn...

2015-09-15 Thread chiwanpark
Github user chiwanpark commented on the pull request:

https://github.com/apache/flink/pull/1127#issuecomment-140327645
  
Hi @twalthr, thanks for your contribution. But this PR contains many 
changes unrelated to HCatalog format. Maybe we should split this PR into 
HCatalog and other changes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2641) Integrate the off-heap memory configuration with the TaskManager start script

2015-09-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2641?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14745094#comment-14745094
 ] 

ASF GitHub Bot commented on FLINK-2641:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1129#discussion_r39488711
  
--- Diff: flink-dist/src/main/flink-bin/bin/taskmanager.sh ---
@@ -51,13 +51,41 @@ if [[ $STARTSTOP == "start" ]]; then
 fi
 fi
 
-if [[ ! ${FLINK_TM_HEAP} =~ ${IS_NUMBER} ]]; then
-echo "[ERROR] Configured TaskManager JVM heap size is not a 
number. Please set '$KEY_TASKM_HEAP_MB' in $FLINK_CONF_FILE."
+if [[ ! ${FLINK_TM_HEAP} =~ ${IS_NUMBER} ]] || [[ "${FLINK_TM_HEAP}" 
-lt "0" ]]; then
+echo "[ERROR] Configured TaskManager JVM heap size is not a 
number. Please set '${KEY_TASKM_MEM_SIZE}' in ${FLINK_CONF_FILE}."
 exit 1
 fi
 
-if [ "$FLINK_TM_HEAP" -gt 0 ]; then
-export JVM_ARGS="$JVM_ARGS -Xms"$FLINK_TM_HEAP"m 
-Xmx"$FLINK_TM_HEAP"m"
+if [ "${FLINK_TM_HEAP}" -gt "0" ]; then
+
+TM_HEAP_SIZE=${FLINK_TM_HEAP}
+TM_OFFHEAP_SIZE=0
+# some space for Netty initilization
+NETTY_BUFFERS=$((1024 * 1024))
+
+if useOffHeapMemory; then
+if [[ "${FLINK_TM_MEM_MANAGED_SIZE}" -gt "0" ]]; then
+# We split up the total memory in heap and off-heap memory
+if [[ "${FLINK_TM_HEAP}" -le 
"${FLINK_TM_MEM_MANAGED_SIZE}" ]]; then
+echo "[ERROR] Configured TaskManager memory size 
('${KEY_TASKM_MEM_SIZE}') must be larger than the managed memory size 
('${KEY_TASKM_MEM_MANAGED_SIZE}')."
+exit 1
+fi
+TM_OFFHEAP_SIZE=${FLINK_TM_MEM_MANAGED_SIZE}
+TM_HEAP_SIZE=$((FLINK_TM_HEAP - FLINK_TM_MEM_MANAGED_SIZE))
+else
+# We calculate the memory using a fraction of the total 
memory
+if [[ `bc -l <<< "${FLINK_TM_MEM_MANAGED_FRACTION} >= 
1.0"` != "0" ]]; then
+echo "[ERROR] Configured TaskManager managed memory 
fraction is not a valid value. Please set '${KEY_TASKM_MEM_MANAGED_FRACTION}' 
in ${FLINK_CONF_FILE}"
--- End diff --

Add validity bounds 0 > x > 1 to message?


> Integrate the off-heap memory configuration with the TaskManager start script
> -
>
> Key: FLINK-2641
> URL: https://issues.apache.org/jira/browse/FLINK-2641
> Project: Flink
>  Issue Type: New Feature
>  Components: Start-Stop Scripts
>Affects Versions: 0.10
>Reporter: Stephan Ewen
>Assignee: Maximilian Michels
> Fix For: 0.10
>
>
> The TaskManager start script needs to adjust the {{-Xmx}}, {{-Xms}}, and 
> {{-XX:MaxDirectMemorySize}} parameters according to the off-heap memory 
> settings.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2641] integrate off-heap memory configu...

2015-09-15 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1129#discussion_r39488711
  
--- Diff: flink-dist/src/main/flink-bin/bin/taskmanager.sh ---
@@ -51,13 +51,41 @@ if [[ $STARTSTOP == "start" ]]; then
 fi
 fi
 
-if [[ ! ${FLINK_TM_HEAP} =~ ${IS_NUMBER} ]]; then
-echo "[ERROR] Configured TaskManager JVM heap size is not a 
number. Please set '$KEY_TASKM_HEAP_MB' in $FLINK_CONF_FILE."
+if [[ ! ${FLINK_TM_HEAP} =~ ${IS_NUMBER} ]] || [[ "${FLINK_TM_HEAP}" 
-lt "0" ]]; then
+echo "[ERROR] Configured TaskManager JVM heap size is not a 
number. Please set '${KEY_TASKM_MEM_SIZE}' in ${FLINK_CONF_FILE}."
 exit 1
 fi
 
-if [ "$FLINK_TM_HEAP" -gt 0 ]; then
-export JVM_ARGS="$JVM_ARGS -Xms"$FLINK_TM_HEAP"m 
-Xmx"$FLINK_TM_HEAP"m"
+if [ "${FLINK_TM_HEAP}" -gt "0" ]; then
+
+TM_HEAP_SIZE=${FLINK_TM_HEAP}
+TM_OFFHEAP_SIZE=0
+# some space for Netty initilization
+NETTY_BUFFERS=$((1024 * 1024))
+
+if useOffHeapMemory; then
+if [[ "${FLINK_TM_MEM_MANAGED_SIZE}" -gt "0" ]]; then
+# We split up the total memory in heap and off-heap memory
+if [[ "${FLINK_TM_HEAP}" -le 
"${FLINK_TM_MEM_MANAGED_SIZE}" ]]; then
+echo "[ERROR] Configured TaskManager memory size 
('${KEY_TASKM_MEM_SIZE}') must be larger than the managed memory size 
('${KEY_TASKM_MEM_MANAGED_SIZE}')."
+exit 1
+fi
+TM_OFFHEAP_SIZE=${FLINK_TM_MEM_MANAGED_SIZE}
+TM_HEAP_SIZE=$((FLINK_TM_HEAP - FLINK_TM_MEM_MANAGED_SIZE))
+else
+# We calculate the memory using a fraction of the total 
memory
+if [[ `bc -l <<< "${FLINK_TM_MEM_MANAGED_FRACTION} >= 
1.0"` != "0" ]]; then
+echo "[ERROR] Configured TaskManager managed memory 
fraction is not a valid value. Please set '${KEY_TASKM_MEM_MANAGED_FRACTION}' 
in ${FLINK_CONF_FILE}"
--- End diff --

Add validity bounds 0 > x > 1 to message?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2641] integrate off-heap memory configu...

2015-09-15 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1129#discussion_r39490684
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
 ---
@@ -1586,32 +1586,29 @@ object TaskManager {

ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY,
"MemoryManager fraction of the free memory must 
be between 0.0 and 1.0")
 
-  val relativeMemSize = 
(EnvironmentInformation.getSizeOfFreeHeapMemoryWithDefrag() *
-fraction).toLong
+  if (memType == MemoryType.HEAP) {
 
-  LOG.info(s"Using $fraction of the currently free heap space for 
Flink managed " +
-s" heap memory (${relativeMemSize >> 20} MB).")
+val relativeMemSize = 
(EnvironmentInformation.getSizeOfFreeHeapMemoryWithDefrag() *
+  fraction).toLong
 
-  relativeMemSize
-}
-else {
-  val ratio = configuration.getFloat(
-ConfigConstants.TASK_MANAGER_MEMORY_OFF_HEAP_RATIO_KEY,
-ConfigConstants.DEFAULT_MEMORY_MANAGER_MEMORY_OFF_HEAP_RATIO)
-  
-  checkConfigParameter(ratio > 0.0f,
-ConfigConstants.TASK_MANAGER_MEMORY_OFF_HEAP_RATIO_KEY,
-"MemoryManager ratio (off-heap memory / heap size) must be larger 
than zero")
-  
-  val maxHeapSize = EnvironmentInformation.getMaxJvmHeapMemory()
-  val relativeMemSize = (maxHeapSize * ratio).toLong
+LOG.info(s"Using $fraction of the currently free heap space for 
Flink managed " +
+  s" heap memory (${relativeMemSize >> 20} MB).")
+
+relativeMemSize
+  } else if (memType == MemoryType.OFF_HEAP) {
 
-  LOG.info(s"Using $ratio time the heap size (${maxHeapSize} bytes) 
for Flink " +
-s"managed off-heap memory (${relativeMemSize >> 20} MB).")
+// The maximum heap memory has been adjusted according to the 
fraction
+val directMemorySize = 
(EnvironmentInformation.getMaxJvmHeapMemory() / fraction).toLong
--- End diff --

Shouldn't this be (HEAPSIZE / (1.0 - fraction)) - HEAPSIZE?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2641) Integrate the off-heap memory configuration with the TaskManager start script

2015-09-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2641?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14745131#comment-14745131
 ] 

ASF GitHub Bot commented on FLINK-2641:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1129#discussion_r39490684
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
 ---
@@ -1586,32 +1586,29 @@ object TaskManager {

ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY,
"MemoryManager fraction of the free memory must 
be between 0.0 and 1.0")
 
-  val relativeMemSize = 
(EnvironmentInformation.getSizeOfFreeHeapMemoryWithDefrag() *
-fraction).toLong
+  if (memType == MemoryType.HEAP) {
 
-  LOG.info(s"Using $fraction of the currently free heap space for 
Flink managed " +
-s" heap memory (${relativeMemSize >> 20} MB).")
+val relativeMemSize = 
(EnvironmentInformation.getSizeOfFreeHeapMemoryWithDefrag() *
+  fraction).toLong
 
-  relativeMemSize
-}
-else {
-  val ratio = configuration.getFloat(
-ConfigConstants.TASK_MANAGER_MEMORY_OFF_HEAP_RATIO_KEY,
-ConfigConstants.DEFAULT_MEMORY_MANAGER_MEMORY_OFF_HEAP_RATIO)
-  
-  checkConfigParameter(ratio > 0.0f,
-ConfigConstants.TASK_MANAGER_MEMORY_OFF_HEAP_RATIO_KEY,
-"MemoryManager ratio (off-heap memory / heap size) must be larger 
than zero")
-  
-  val maxHeapSize = EnvironmentInformation.getMaxJvmHeapMemory()
-  val relativeMemSize = (maxHeapSize * ratio).toLong
+LOG.info(s"Using $fraction of the currently free heap space for 
Flink managed " +
+  s" heap memory (${relativeMemSize >> 20} MB).")
+
+relativeMemSize
+  } else if (memType == MemoryType.OFF_HEAP) {
 
-  LOG.info(s"Using $ratio time the heap size (${maxHeapSize} bytes) 
for Flink " +
-s"managed off-heap memory (${relativeMemSize >> 20} MB).")
+// The maximum heap memory has been adjusted according to the 
fraction
+val directMemorySize = 
(EnvironmentInformation.getMaxJvmHeapMemory() / fraction).toLong
--- End diff --

Shouldn't this be (HEAPSIZE / (1.0 - fraction)) - HEAPSIZE?


> Integrate the off-heap memory configuration with the TaskManager start script
> -
>
> Key: FLINK-2641
> URL: https://issues.apache.org/jira/browse/FLINK-2641
> Project: Flink
>  Issue Type: New Feature
>  Components: Start-Stop Scripts
>Affects Versions: 0.10
>Reporter: Stephan Ewen
>Assignee: Maximilian Michels
> Fix For: 0.10
>
>
> The TaskManager start script needs to adjust the {{-Xmx}}, {{-Xms}}, and 
> {{-XX:MaxDirectMemorySize}} parameters according to the off-heap memory 
> settings.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2017] Add predefined required parameter...

2015-09-15 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1097#discussion_r39493200
  
--- Diff: 
flink-java/src/main/java/org/apache/flink/api/java/utils/RequiredParameter.java 
---
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.utils;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * Facility to manage required parameters in user defined functions.
+ */
+public class RequiredParameter {
+
+   private static final String HELP_TEXT_PARAM_DELIMITER = "\t";
+   private static final String HELP_TEXT_LINE_DELIMITER = "\n";
+
+   private HashMap data;
+
+   public RequiredParameter() {
+   this.data = new HashMap<>();
+   }
+
+   public void add(Option option) {
+   this.data.put(option.getName(), option);
+   }
+
+   /**
+* Check if all parameters defined as required have been supplied.
+*
+* @param parameterTool - parameters supplied by user.
+*/
+   public void check(ParameterTool parameterTool) throws 
RequiredParameterException {
+   for (Option o : data.values()) {
+   // if the parameter is not present or its value is 
undefined, throw a RuntimeException.
+   if (!parameterTool.data.containsKey(o.getName()) || 
keyIsUndefined(o.getName(), parameterTool.data)) {
--- End diff --

Check alternative name as well?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2017) Add predefined required parameters to ParameterTool

2015-09-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2017?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14745170#comment-14745170
 ] 

ASF GitHub Bot commented on FLINK-2017:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1097#discussion_r39493200
  
--- Diff: 
flink-java/src/main/java/org/apache/flink/api/java/utils/RequiredParameter.java 
---
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.utils;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * Facility to manage required parameters in user defined functions.
+ */
+public class RequiredParameter {
+
+   private static final String HELP_TEXT_PARAM_DELIMITER = "\t";
+   private static final String HELP_TEXT_LINE_DELIMITER = "\n";
+
+   private HashMap data;
+
+   public RequiredParameter() {
+   this.data = new HashMap<>();
+   }
+
+   public void add(Option option) {
+   this.data.put(option.getName(), option);
+   }
+
+   /**
+* Check if all parameters defined as required have been supplied.
+*
+* @param parameterTool - parameters supplied by user.
+*/
+   public void check(ParameterTool parameterTool) throws 
RequiredParameterException {
+   for (Option o : data.values()) {
+   // if the parameter is not present or its value is 
undefined, throw a RuntimeException.
+   if (!parameterTool.data.containsKey(o.getName()) || 
keyIsUndefined(o.getName(), parameterTool.data)) {
--- End diff --

Check alternative name as well?


> Add predefined required parameters to ParameterTool
> ---
>
> Key: FLINK-2017
> URL: https://issues.apache.org/jira/browse/FLINK-2017
> Project: Flink
>  Issue Type: Improvement
>Affects Versions: 0.9
>Reporter: Robert Metzger
>  Labels: starter
>
> In FLINK-1525 we've added the {{ParameterTool}}.
> During the PR review, there was a request for required parameters.
> This issue is about implementing a facility to define required parameters. 
> The tool should also be able to print a help menu with a list of all 
> parameters.
> This test case shows my initial ideas how to design the API
> {code}
>   @Test
>   public void requiredParameters() {
>   RequiredParameters required = new RequiredParameters();
>   Option input = required.add("input").alt("i").help("Path to 
> input file or directory"); // parameter with long and short variant
>   required.add("output"); // parameter only with long variant
>   Option parallelism = 
> required.add("parallelism").alt("p").type(Integer.class); // parameter with 
> type
>   Option spOption = 
> required.add("sourceParallelism").alt("sp").defaultValue(12).help("Number 
> specifying the number of parallel data source instances"); // parameter with 
> default value, specifying the type.
>   Option executionType = 
> required.add("executionType").alt("et").defaultValue("pipelined").choices("pipelined",
>  "batch");
>   ParameterUtil parameter = ParameterUtil.fromArgs(new 
> String[]{"-i", "someinput", "--output", "someout", "-p", "15"});
>   required.check(parameter);
>   required.printHelp();
>   required.checkAndPopulate(parameter);
>   String inputString = input.get();
>   int par = parallelism.getInteger();
>   String output = parameter.get("output");
>   int sourcePar = parameter.getInteger(spOption.getName());
>   }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2641] integrate off-heap memory configu...

2015-09-15 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/1129#discussion_r39493894
  
--- Diff: flink-dist/src/main/flink-bin/bin/taskmanager.sh ---
@@ -51,13 +51,41 @@ if [[ $STARTSTOP == "start" ]]; then
 fi
 fi
 
-if [[ ! ${FLINK_TM_HEAP} =~ ${IS_NUMBER} ]]; then
-echo "[ERROR] Configured TaskManager JVM heap size is not a 
number. Please set '$KEY_TASKM_HEAP_MB' in $FLINK_CONF_FILE."
+if [[ ! ${FLINK_TM_HEAP} =~ ${IS_NUMBER} ]] || [[ "${FLINK_TM_HEAP}" 
-lt "0" ]]; then
+echo "[ERROR] Configured TaskManager JVM heap size is not a 
number. Please set '${KEY_TASKM_MEM_SIZE}' in ${FLINK_CONF_FILE}."
 exit 1
 fi
 
-if [ "$FLINK_TM_HEAP" -gt 0 ]; then
-export JVM_ARGS="$JVM_ARGS -Xms"$FLINK_TM_HEAP"m 
-Xmx"$FLINK_TM_HEAP"m"
+if [ "${FLINK_TM_HEAP}" -gt "0" ]; then
+
+TM_HEAP_SIZE=${FLINK_TM_HEAP}
+TM_OFFHEAP_SIZE=0
+# some space for Netty initilization
+NETTY_BUFFERS=$((1024 * 1024))
+
+if useOffHeapMemory; then
+if [[ "${FLINK_TM_MEM_MANAGED_SIZE}" -gt "0" ]]; then
+# We split up the total memory in heap and off-heap memory
+if [[ "${FLINK_TM_HEAP}" -le 
"${FLINK_TM_MEM_MANAGED_SIZE}" ]]; then
+echo "[ERROR] Configured TaskManager memory size 
('${KEY_TASKM_MEM_SIZE}') must be larger than the managed memory size 
('${KEY_TASKM_MEM_MANAGED_SIZE}')."
+exit 1
+fi
+TM_OFFHEAP_SIZE=${FLINK_TM_MEM_MANAGED_SIZE}
+TM_HEAP_SIZE=$((FLINK_TM_HEAP - FLINK_TM_MEM_MANAGED_SIZE))
+else
+# We calculate the memory using a fraction of the total 
memory
+if [[ `bc -l <<< "${FLINK_TM_MEM_MANAGED_FRACTION} >= 
1.0"` != "0" ]]; then
+echo "[ERROR] Configured TaskManager managed memory 
fraction is not a valid value. Please set '${KEY_TASKM_MEM_MANAGED_FRACTION}' 
in ${FLINK_CONF_FILE}"
--- End diff --

Thanks, added a lower bound.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2641] integrate off-heap memory configu...

2015-09-15 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/1129#discussion_r39493902
  
--- Diff: flink-dist/src/main/flink-bin/bin/taskmanager.sh ---
@@ -51,13 +51,41 @@ if [[ $STARTSTOP == "start" ]]; then
 fi
 fi
 
-if [[ ! ${FLINK_TM_HEAP} =~ ${IS_NUMBER} ]]; then
-echo "[ERROR] Configured TaskManager JVM heap size is not a 
number. Please set '$KEY_TASKM_HEAP_MB' in $FLINK_CONF_FILE."
+if [[ ! ${FLINK_TM_HEAP} =~ ${IS_NUMBER} ]] || [[ "${FLINK_TM_HEAP}" 
-lt "0" ]]; then
+echo "[ERROR] Configured TaskManager JVM heap size is not a 
number. Please set '${KEY_TASKM_MEM_SIZE}' in ${FLINK_CONF_FILE}."
 exit 1
 fi
 
-if [ "$FLINK_TM_HEAP" -gt 0 ]; then
-export JVM_ARGS="$JVM_ARGS -Xms"$FLINK_TM_HEAP"m 
-Xmx"$FLINK_TM_HEAP"m"
+if [ "${FLINK_TM_HEAP}" -gt "0" ]; then
+
+TM_HEAP_SIZE=${FLINK_TM_HEAP}
+TM_OFFHEAP_SIZE=0
+# some space for Netty initilization
--- End diff --

Thanks


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-1745) Add exact k-nearest-neighbours algorithm to machine learning library

2015-09-15 Thread Till Rohrmann (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1745?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14745063#comment-14745063
 ] 

Till Rohrmann commented on FLINK-1745:
--

Done.

> Add exact k-nearest-neighbours algorithm to machine learning library
> 
>
> Key: FLINK-1745
> URL: https://issues.apache.org/jira/browse/FLINK-1745
> Project: Flink
>  Issue Type: New Feature
>  Components: Machine Learning Library
>Reporter: Till Rohrmann
>Assignee: Daniel Blazevski
>  Labels: ML, Starter
>
> Even though the k-nearest-neighbours (kNN) [1,2] algorithm is quite trivial 
> it is still used as a mean to classify data and to do regression. This issue 
> focuses on the implementation of an exact kNN (H-BNLJ, H-BRJ) algorithm as 
> proposed in [2].
> Could be a starter task.
> Resources:
> [1] [http://en.wikipedia.org/wiki/K-nearest_neighbors_algorithm]
> [2] [https://www.cs.utah.edu/~lifeifei/papers/mrknnj.pdf]



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-1745) Add exact k-nearest-neighbours algorithm to machine learning library

2015-09-15 Thread Till Rohrmann (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-1745?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann updated FLINK-1745:
-
Assignee: Daniel Blazevski

> Add exact k-nearest-neighbours algorithm to machine learning library
> 
>
> Key: FLINK-1745
> URL: https://issues.apache.org/jira/browse/FLINK-1745
> Project: Flink
>  Issue Type: New Feature
>  Components: Machine Learning Library
>Reporter: Till Rohrmann
>Assignee: Daniel Blazevski
>  Labels: ML, Starter
>
> Even though the k-nearest-neighbours (kNN) [1,2] algorithm is quite trivial 
> it is still used as a mean to classify data and to do regression. This issue 
> focuses on the implementation of an exact kNN (H-BNLJ, H-BRJ) algorithm as 
> proposed in [2].
> Could be a starter task.
> Resources:
> [1] [http://en.wikipedia.org/wiki/K-nearest_neighbors_algorithm]
> [2] [https://www.cs.utah.edu/~lifeifei/papers/mrknnj.pdf]



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2583) Add Stream Sink For Rolling HDFS Files

2015-09-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14745062#comment-14745062
 ] 

ASF GitHub Bot commented on FLINK-2583:
---

Github user aminouvic commented on the pull request:

https://github.com/apache/flink/pull/1084#issuecomment-140325179
  
Yeah you're right better have an operational version of the sink first, 
followup JIRA created https://issues.apache.org/jira/browse/FLINK-2672 


> Add Stream Sink For Rolling HDFS Files
> --
>
> Key: FLINK-2583
> URL: https://issues.apache.org/jira/browse/FLINK-2583
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
> Fix For: 0.10
>
>
> In addition to having configurable file-rolling behavior the Sink should also 
> integrate with checkpointing to make it possible to have exactly-once 
> semantics throughout the topology.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2583] Add Stream Sink For Rolling HDFS ...

2015-09-15 Thread aminouvic
Github user aminouvic commented on the pull request:

https://github.com/apache/flink/pull/1084#issuecomment-140325179
  
Yeah you're right better have an operational version of the sink first, 
followup JIRA created https://issues.apache.org/jira/browse/FLINK-2672 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2665] [api-breaking] [runtime] Makes Ex...

2015-09-15 Thread fhueske
Github user fhueske commented on the pull request:

https://github.com/apache/flink/pull/1128#issuecomment-140325565
  
LGTM, will merge this


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2641] integrate off-heap memory configu...

2015-09-15 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1129#discussion_r39488877
  
--- Diff: flink-dist/src/main/flink-bin/bin/taskmanager.sh ---
@@ -51,13 +51,41 @@ if [[ $STARTSTOP == "start" ]]; then
 fi
 fi
 
-if [[ ! ${FLINK_TM_HEAP} =~ ${IS_NUMBER} ]]; then
-echo "[ERROR] Configured TaskManager JVM heap size is not a 
number. Please set '$KEY_TASKM_HEAP_MB' in $FLINK_CONF_FILE."
+if [[ ! ${FLINK_TM_HEAP} =~ ${IS_NUMBER} ]] || [[ "${FLINK_TM_HEAP}" 
-lt "0" ]]; then
+echo "[ERROR] Configured TaskManager JVM heap size is not a 
number. Please set '${KEY_TASKM_MEM_SIZE}' in ${FLINK_CONF_FILE}."
 exit 1
 fi
 
-if [ "$FLINK_TM_HEAP" -gt 0 ]; then
-export JVM_ARGS="$JVM_ARGS -Xms"$FLINK_TM_HEAP"m 
-Xmx"$FLINK_TM_HEAP"m"
+if [ "${FLINK_TM_HEAP}" -gt "0" ]; then
+
+TM_HEAP_SIZE=${FLINK_TM_HEAP}
+TM_OFFHEAP_SIZE=0
+# some space for Netty initilization
+NETTY_BUFFERS=$((1024 * 1024))
--- End diff --

Are we sure that 1MB is enough for Netty?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2641) Integrate the off-heap memory configuration with the TaskManager start script

2015-09-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2641?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14745096#comment-14745096
 ] 

ASF GitHub Bot commented on FLINK-2641:
---

Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/1129#discussion_r39488925
  
--- Diff: flink-dist/src/main/flink-bin/bin/config.sh ---
@@ -173,12 +183,37 @@ IS_NUMBER="^[0-9]+$"
 
 # Define FLINK_JM_HEAP if it is not already set
 if [ -z "${FLINK_JM_HEAP}" ]; then
-FLINK_JM_HEAP=$(readFromConfig ${KEY_JOBM_HEAP_MB} 0 "${YAML_CONF}")
+FLINK_JM_HEAP=$(readFromConfig ${KEY_JOBM_MEM_SIZE} 0 "${YAML_CONF}")
 fi
 
 # Define FLINK_TM_HEAP if it is not already set
 if [ -z "${FLINK_TM_HEAP}" ]; then
-FLINK_TM_HEAP=$(readFromConfig ${KEY_TASKM_HEAP_MB} 0 "${YAML_CONF}")
+FLINK_TM_HEAP=$(readFromConfig ${KEY_TASKM_MEM_SIZE} 0 "${YAML_CONF}")
+fi
+
+# Define FLINK_TM_MEM_MANAGED_SIZE if it is not already set
+if [ -z "${FLINK_TM_MEM_MANAGED_SIZE}" ]; then
+FLINK_TM_MEM_MANAGED_SIZE=$(readFromConfig 
${KEY_TASKM_MEM_MANAGED_SIZE} 0 "${YAML_CONF}")
+fi
+
+# Define FLINK_TM_MEM_MANAGED_FRACTION if it is not already set
+if [ -z "${FLINK_TM_MEM_MANAGED_FRACTION}" ]; then
+FLINK_TM_MEM_MANAGED_FRACTION=$(readFromConfig 
${KEY_TASKM_MEM_MANAGED_FRACTION} 0 "${YAML_CONF}")
+fi
+
+# Define FLINK_TM_MEM_NETWORK_SIZE if it is not already set
+if [ -z "${FLINK_TM_MEM_NETWORK_SIZE}" ]; then
+BUFFER_SIZE=$(readFromConfig ${KEY_TASKM_MEM_SEGMENT_SIZE} "0" 
"${YAML_CONF}")
+if [ "${BUFFER_SIZE}" -eq "0" ]; then
--- End diff --

`KEY_TASKM_MEM_NETWORK_BUFFER_SIZE` is the deprecated key. In the existing 
Flink code, it is overridden by the new default network buffer size key 
`KEY_TASKM_MEM_SEGMENT_SIZE`.


> Integrate the off-heap memory configuration with the TaskManager start script
> -
>
> Key: FLINK-2641
> URL: https://issues.apache.org/jira/browse/FLINK-2641
> Project: Flink
>  Issue Type: New Feature
>  Components: Start-Stop Scripts
>Affects Versions: 0.10
>Reporter: Stephan Ewen
>Assignee: Maximilian Michels
> Fix For: 0.10
>
>
> The TaskManager start script needs to adjust the {{-Xmx}}, {{-Xms}}, and 
> {{-XX:MaxDirectMemorySize}} parameters according to the off-heap memory 
> settings.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2641) Integrate the off-heap memory configuration with the TaskManager start script

2015-09-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2641?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14745095#comment-14745095
 ] 

ASF GitHub Bot commented on FLINK-2641:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1129#discussion_r39488877
  
--- Diff: flink-dist/src/main/flink-bin/bin/taskmanager.sh ---
@@ -51,13 +51,41 @@ if [[ $STARTSTOP == "start" ]]; then
 fi
 fi
 
-if [[ ! ${FLINK_TM_HEAP} =~ ${IS_NUMBER} ]]; then
-echo "[ERROR] Configured TaskManager JVM heap size is not a 
number. Please set '$KEY_TASKM_HEAP_MB' in $FLINK_CONF_FILE."
+if [[ ! ${FLINK_TM_HEAP} =~ ${IS_NUMBER} ]] || [[ "${FLINK_TM_HEAP}" 
-lt "0" ]]; then
+echo "[ERROR] Configured TaskManager JVM heap size is not a 
number. Please set '${KEY_TASKM_MEM_SIZE}' in ${FLINK_CONF_FILE}."
 exit 1
 fi
 
-if [ "$FLINK_TM_HEAP" -gt 0 ]; then
-export JVM_ARGS="$JVM_ARGS -Xms"$FLINK_TM_HEAP"m 
-Xmx"$FLINK_TM_HEAP"m"
+if [ "${FLINK_TM_HEAP}" -gt "0" ]; then
+
+TM_HEAP_SIZE=${FLINK_TM_HEAP}
+TM_OFFHEAP_SIZE=0
+# some space for Netty initilization
+NETTY_BUFFERS=$((1024 * 1024))
--- End diff --

Are we sure that 1MB is enough for Netty?


> Integrate the off-heap memory configuration with the TaskManager start script
> -
>
> Key: FLINK-2641
> URL: https://issues.apache.org/jira/browse/FLINK-2641
> Project: Flink
>  Issue Type: New Feature
>  Components: Start-Stop Scripts
>Affects Versions: 0.10
>Reporter: Stephan Ewen
>Assignee: Maximilian Michels
> Fix For: 0.10
>
>
> The TaskManager start script needs to adjust the {{-Xmx}}, {{-Xms}}, and 
> {{-XX:MaxDirectMemorySize}} parameters according to the off-heap memory 
> settings.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2641] integrate off-heap memory configu...

2015-09-15 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/1129#discussion_r39488925
  
--- Diff: flink-dist/src/main/flink-bin/bin/config.sh ---
@@ -173,12 +183,37 @@ IS_NUMBER="^[0-9]+$"
 
 # Define FLINK_JM_HEAP if it is not already set
 if [ -z "${FLINK_JM_HEAP}" ]; then
-FLINK_JM_HEAP=$(readFromConfig ${KEY_JOBM_HEAP_MB} 0 "${YAML_CONF}")
+FLINK_JM_HEAP=$(readFromConfig ${KEY_JOBM_MEM_SIZE} 0 "${YAML_CONF}")
 fi
 
 # Define FLINK_TM_HEAP if it is not already set
 if [ -z "${FLINK_TM_HEAP}" ]; then
-FLINK_TM_HEAP=$(readFromConfig ${KEY_TASKM_HEAP_MB} 0 "${YAML_CONF}")
+FLINK_TM_HEAP=$(readFromConfig ${KEY_TASKM_MEM_SIZE} 0 "${YAML_CONF}")
+fi
+
+# Define FLINK_TM_MEM_MANAGED_SIZE if it is not already set
+if [ -z "${FLINK_TM_MEM_MANAGED_SIZE}" ]; then
+FLINK_TM_MEM_MANAGED_SIZE=$(readFromConfig 
${KEY_TASKM_MEM_MANAGED_SIZE} 0 "${YAML_CONF}")
+fi
+
+# Define FLINK_TM_MEM_MANAGED_FRACTION if it is not already set
+if [ -z "${FLINK_TM_MEM_MANAGED_FRACTION}" ]; then
+FLINK_TM_MEM_MANAGED_FRACTION=$(readFromConfig 
${KEY_TASKM_MEM_MANAGED_FRACTION} 0 "${YAML_CONF}")
+fi
+
+# Define FLINK_TM_MEM_NETWORK_SIZE if it is not already set
+if [ -z "${FLINK_TM_MEM_NETWORK_SIZE}" ]; then
+BUFFER_SIZE=$(readFromConfig ${KEY_TASKM_MEM_SEGMENT_SIZE} "0" 
"${YAML_CONF}")
+if [ "${BUFFER_SIZE}" -eq "0" ]; then
--- End diff --

`KEY_TASKM_MEM_NETWORK_BUFFER_SIZE` is the deprecated key. In the existing 
Flink code, it is overridden by the new default network buffer size key 
`KEY_TASKM_MEM_SEGMENT_SIZE`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2641) Integrate the off-heap memory configuration with the TaskManager start script

2015-09-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2641?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14745098#comment-14745098
 ] 

ASF GitHub Bot commented on FLINK-2641:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1129#discussion_r39488999
  
--- Diff: flink-dist/src/main/flink-bin/bin/taskmanager.sh ---
@@ -51,13 +51,41 @@ if [[ $STARTSTOP == "start" ]]; then
 fi
 fi
 
-if [[ ! ${FLINK_TM_HEAP} =~ ${IS_NUMBER} ]]; then
-echo "[ERROR] Configured TaskManager JVM heap size is not a 
number. Please set '$KEY_TASKM_HEAP_MB' in $FLINK_CONF_FILE."
+if [[ ! ${FLINK_TM_HEAP} =~ ${IS_NUMBER} ]] || [[ "${FLINK_TM_HEAP}" 
-lt "0" ]]; then
+echo "[ERROR] Configured TaskManager JVM heap size is not a 
number. Please set '${KEY_TASKM_MEM_SIZE}' in ${FLINK_CONF_FILE}."
 exit 1
 fi
 
-if [ "$FLINK_TM_HEAP" -gt 0 ]; then
-export JVM_ARGS="$JVM_ARGS -Xms"$FLINK_TM_HEAP"m 
-Xmx"$FLINK_TM_HEAP"m"
+if [ "${FLINK_TM_HEAP}" -gt "0" ]; then
+
+TM_HEAP_SIZE=${FLINK_TM_HEAP}
+TM_OFFHEAP_SIZE=0
+# some space for Netty initilization
+NETTY_BUFFERS=$((1024 * 1024))
+
+if useOffHeapMemory; then
+if [[ "${FLINK_TM_MEM_MANAGED_SIZE}" -gt "0" ]]; then
+# We split up the total memory in heap and off-heap memory
+if [[ "${FLINK_TM_HEAP}" -le 
"${FLINK_TM_MEM_MANAGED_SIZE}" ]]; then
+echo "[ERROR] Configured TaskManager memory size 
('${KEY_TASKM_MEM_SIZE}') must be larger than the managed memory size 
('${KEY_TASKM_MEM_MANAGED_SIZE}')."
+exit 1
+fi
+TM_OFFHEAP_SIZE=${FLINK_TM_MEM_MANAGED_SIZE}
+TM_HEAP_SIZE=$((FLINK_TM_HEAP - FLINK_TM_MEM_MANAGED_SIZE))
+else
+# We calculate the memory using a fraction of the total 
memory
+if [[ `bc -l <<< "${FLINK_TM_MEM_MANAGED_FRACTION} >= 
1.0"` != "0" ]]; then
+echo "[ERROR] Configured TaskManager managed memory 
fraction is not a valid value. Please set '${KEY_TASKM_MEM_MANAGED_FRACTION}' 
in ${FLINK_CONF_FILE}"
+exit 1
+fi
+# recalculate the JVM heap memory by taking the off-heap 
ratio into account
+TM_OFFHEAP_SIZE=`printf '%.0f\n' $(bc -l <<< 
"${FLINK_TM_HEAP} * ${FLINK_TM_MEM_MANAGED_FRACTION}")`
+TM_HEAP_SIZE=$((FLINK_TM_HEAP - TM_OFFHEAP_SIZE))
+fi
+fi
+
+export JVM_ARGS="${JVM_ARGS} -Xms${TM_HEAP_SIZE}M 
-Xmx${TM_HEAP_SIZE}M -XX:MaxDirectMemorySize=$((TM_OFFHEAP_SIZE * 1024 * 1024 + 
FLINK_TM_MEM_NETWORK_SIZE + NETTY_BUFFERS))"
--- End diff --

Append "M" to -XX:MaxDirectMemorySize parameter?


> Integrate the off-heap memory configuration with the TaskManager start script
> -
>
> Key: FLINK-2641
> URL: https://issues.apache.org/jira/browse/FLINK-2641
> Project: Flink
>  Issue Type: New Feature
>  Components: Start-Stop Scripts
>Affects Versions: 0.10
>Reporter: Stephan Ewen
>Assignee: Maximilian Michels
> Fix For: 0.10
>
>
> The TaskManager start script needs to adjust the {{-Xmx}}, {{-Xms}}, and 
> {{-XX:MaxDirectMemorySize}} parameters according to the off-heap memory 
> settings.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: Getter for wrapped StreamExecutionEnvironment ...

2015-09-15 Thread StephanEwen
Github user StephanEwen commented on the pull request:

https://github.com/apache/flink/pull/1120#issuecomment-140326566
  
I think adding this accessor is fine.

Also, in case someone goes crazy and wants to mix the Java and Scala-style 
functions in one program ;-)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2641) Integrate the off-heap memory configuration with the TaskManager start script

2015-09-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2641?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14745084#comment-14745084
 ] 

ASF GitHub Bot commented on FLINK-2641:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1129#discussion_r39488168
  
--- Diff: flink-dist/src/main/flink-bin/bin/config.sh ---
@@ -173,12 +183,37 @@ IS_NUMBER="^[0-9]+$"
 
 # Define FLINK_JM_HEAP if it is not already set
 if [ -z "${FLINK_JM_HEAP}" ]; then
-FLINK_JM_HEAP=$(readFromConfig ${KEY_JOBM_HEAP_MB} 0 "${YAML_CONF}")
+FLINK_JM_HEAP=$(readFromConfig ${KEY_JOBM_MEM_SIZE} 0 "${YAML_CONF}")
 fi
 
 # Define FLINK_TM_HEAP if it is not already set
 if [ -z "${FLINK_TM_HEAP}" ]; then
-FLINK_TM_HEAP=$(readFromConfig ${KEY_TASKM_HEAP_MB} 0 "${YAML_CONF}")
+FLINK_TM_HEAP=$(readFromConfig ${KEY_TASKM_MEM_SIZE} 0 "${YAML_CONF}")
+fi
+
+# Define FLINK_TM_MEM_MANAGED_SIZE if it is not already set
+if [ -z "${FLINK_TM_MEM_MANAGED_SIZE}" ]; then
+FLINK_TM_MEM_MANAGED_SIZE=$(readFromConfig 
${KEY_TASKM_MEM_MANAGED_SIZE} 0 "${YAML_CONF}")
+fi
+
+# Define FLINK_TM_MEM_MANAGED_FRACTION if it is not already set
+if [ -z "${FLINK_TM_MEM_MANAGED_FRACTION}" ]; then
+FLINK_TM_MEM_MANAGED_FRACTION=$(readFromConfig 
${KEY_TASKM_MEM_MANAGED_FRACTION} 0 "${YAML_CONF}")
+fi
+
+# Define FLINK_TM_MEM_NETWORK_SIZE if it is not already set
+if [ -z "${FLINK_TM_MEM_NETWORK_SIZE}" ]; then
+BUFFER_SIZE=$(readFromConfig ${KEY_TASKM_MEM_SEGMENT_SIZE} "0" 
"${YAML_CONF}")
+if [ "${BUFFER_SIZE}" -eq "0" ]; then
--- End diff --

Shouldn't it be the other way round, i.e., use 
`KEY_TASKM_MEM_NETWORK_BUFFER_SIZE` if defined and `KEY_TASKM_MEM_SEGMENT_SIZE` 
otherwise?


> Integrate the off-heap memory configuration with the TaskManager start script
> -
>
> Key: FLINK-2641
> URL: https://issues.apache.org/jira/browse/FLINK-2641
> Project: Flink
>  Issue Type: New Feature
>  Components: Start-Stop Scripts
>Affects Versions: 0.10
>Reporter: Stephan Ewen
>Assignee: Maximilian Michels
> Fix For: 0.10
>
>
> The TaskManager start script needs to adjust the {{-Xmx}}, {{-Xms}}, and 
> {{-XX:MaxDirectMemorySize}} parameters according to the off-heap memory 
> settings.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2641] integrate off-heap memory configu...

2015-09-15 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1129#discussion_r39488168
  
--- Diff: flink-dist/src/main/flink-bin/bin/config.sh ---
@@ -173,12 +183,37 @@ IS_NUMBER="^[0-9]+$"
 
 # Define FLINK_JM_HEAP if it is not already set
 if [ -z "${FLINK_JM_HEAP}" ]; then
-FLINK_JM_HEAP=$(readFromConfig ${KEY_JOBM_HEAP_MB} 0 "${YAML_CONF}")
+FLINK_JM_HEAP=$(readFromConfig ${KEY_JOBM_MEM_SIZE} 0 "${YAML_CONF}")
 fi
 
 # Define FLINK_TM_HEAP if it is not already set
 if [ -z "${FLINK_TM_HEAP}" ]; then
-FLINK_TM_HEAP=$(readFromConfig ${KEY_TASKM_HEAP_MB} 0 "${YAML_CONF}")
+FLINK_TM_HEAP=$(readFromConfig ${KEY_TASKM_MEM_SIZE} 0 "${YAML_CONF}")
+fi
+
+# Define FLINK_TM_MEM_MANAGED_SIZE if it is not already set
+if [ -z "${FLINK_TM_MEM_MANAGED_SIZE}" ]; then
+FLINK_TM_MEM_MANAGED_SIZE=$(readFromConfig 
${KEY_TASKM_MEM_MANAGED_SIZE} 0 "${YAML_CONF}")
+fi
+
+# Define FLINK_TM_MEM_MANAGED_FRACTION if it is not already set
+if [ -z "${FLINK_TM_MEM_MANAGED_FRACTION}" ]; then
+FLINK_TM_MEM_MANAGED_FRACTION=$(readFromConfig 
${KEY_TASKM_MEM_MANAGED_FRACTION} 0 "${YAML_CONF}")
+fi
+
+# Define FLINK_TM_MEM_NETWORK_SIZE if it is not already set
+if [ -z "${FLINK_TM_MEM_NETWORK_SIZE}" ]; then
+BUFFER_SIZE=$(readFromConfig ${KEY_TASKM_MEM_SEGMENT_SIZE} "0" 
"${YAML_CONF}")
+if [ "${BUFFER_SIZE}" -eq "0" ]; then
--- End diff --

Shouldn't it be the other way round, i.e., use 
`KEY_TASKM_MEM_NETWORK_BUFFER_SIZE` if defined and `KEY_TASKM_MEM_SEGMENT_SIZE` 
otherwise?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2641) Integrate the off-heap memory configuration with the TaskManager start script

2015-09-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2641?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14745092#comment-14745092
 ] 

ASF GitHub Bot commented on FLINK-2641:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1129#discussion_r39488510
  
--- Diff: flink-dist/src/main/flink-bin/bin/taskmanager.sh ---
@@ -51,13 +51,41 @@ if [[ $STARTSTOP == "start" ]]; then
 fi
 fi
 
-if [[ ! ${FLINK_TM_HEAP} =~ ${IS_NUMBER} ]]; then
-echo "[ERROR] Configured TaskManager JVM heap size is not a 
number. Please set '$KEY_TASKM_HEAP_MB' in $FLINK_CONF_FILE."
+if [[ ! ${FLINK_TM_HEAP} =~ ${IS_NUMBER} ]] || [[ "${FLINK_TM_HEAP}" 
-lt "0" ]]; then
+echo "[ERROR] Configured TaskManager JVM heap size is not a 
number. Please set '${KEY_TASKM_MEM_SIZE}' in ${FLINK_CONF_FILE}."
 exit 1
 fi
 
-if [ "$FLINK_TM_HEAP" -gt 0 ]; then
-export JVM_ARGS="$JVM_ARGS -Xms"$FLINK_TM_HEAP"m 
-Xmx"$FLINK_TM_HEAP"m"
+if [ "${FLINK_TM_HEAP}" -gt "0" ]; then
+
+TM_HEAP_SIZE=${FLINK_TM_HEAP}
+TM_OFFHEAP_SIZE=0
+# some space for Netty initilization
--- End diff --

typo +"a"


> Integrate the off-heap memory configuration with the TaskManager start script
> -
>
> Key: FLINK-2641
> URL: https://issues.apache.org/jira/browse/FLINK-2641
> Project: Flink
>  Issue Type: New Feature
>  Components: Start-Stop Scripts
>Affects Versions: 0.10
>Reporter: Stephan Ewen
>Assignee: Maximilian Michels
> Fix For: 0.10
>
>
> The TaskManager start script needs to adjust the {{-Xmx}}, {{-Xms}}, and 
> {{-XX:MaxDirectMemorySize}} parameters according to the off-heap memory 
> settings.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2641] integrate off-heap memory configu...

2015-09-15 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1129#discussion_r39488510
  
--- Diff: flink-dist/src/main/flink-bin/bin/taskmanager.sh ---
@@ -51,13 +51,41 @@ if [[ $STARTSTOP == "start" ]]; then
 fi
 fi
 
-if [[ ! ${FLINK_TM_HEAP} =~ ${IS_NUMBER} ]]; then
-echo "[ERROR] Configured TaskManager JVM heap size is not a 
number. Please set '$KEY_TASKM_HEAP_MB' in $FLINK_CONF_FILE."
+if [[ ! ${FLINK_TM_HEAP} =~ ${IS_NUMBER} ]] || [[ "${FLINK_TM_HEAP}" 
-lt "0" ]]; then
+echo "[ERROR] Configured TaskManager JVM heap size is not a 
number. Please set '${KEY_TASKM_MEM_SIZE}' in ${FLINK_CONF_FILE}."
 exit 1
 fi
 
-if [ "$FLINK_TM_HEAP" -gt 0 ]; then
-export JVM_ARGS="$JVM_ARGS -Xms"$FLINK_TM_HEAP"m 
-Xmx"$FLINK_TM_HEAP"m"
+if [ "${FLINK_TM_HEAP}" -gt "0" ]; then
+
+TM_HEAP_SIZE=${FLINK_TM_HEAP}
+TM_OFFHEAP_SIZE=0
+# some space for Netty initilization
--- End diff --

typo +"a"


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2641) Integrate the off-heap memory configuration with the TaskManager start script

2015-09-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2641?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14745118#comment-14745118
 ] 

ASF GitHub Bot commented on FLINK-2641:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1129#discussion_r39489771
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
 ---
@@ -1586,32 +1586,29 @@ object TaskManager {

ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY,
"MemoryManager fraction of the free memory must 
be between 0.0 and 1.0")
 
-  val relativeMemSize = 
(EnvironmentInformation.getSizeOfFreeHeapMemoryWithDefrag() *
-fraction).toLong
+  if (memType == MemoryType.HEAP) {
 
-  LOG.info(s"Using $fraction of the currently free heap space for 
Flink managed " +
-s" heap memory (${relativeMemSize >> 20} MB).")
+val relativeMemSize = 
(EnvironmentInformation.getSizeOfFreeHeapMemoryWithDefrag() *
+  fraction).toLong
 
-  relativeMemSize
-}
-else {
-  val ratio = configuration.getFloat(
-ConfigConstants.TASK_MANAGER_MEMORY_OFF_HEAP_RATIO_KEY,
-ConfigConstants.DEFAULT_MEMORY_MANAGER_MEMORY_OFF_HEAP_RATIO)
-  
-  checkConfigParameter(ratio > 0.0f,
-ConfigConstants.TASK_MANAGER_MEMORY_OFF_HEAP_RATIO_KEY,
-"MemoryManager ratio (off-heap memory / heap size) must be larger 
than zero")
-  
-  val maxHeapSize = EnvironmentInformation.getMaxJvmHeapMemory()
-  val relativeMemSize = (maxHeapSize * ratio).toLong
+LOG.info(s"Using $fraction of the currently free heap space for 
Flink managed " +
+  s" heap memory (${relativeMemSize >> 20} MB).")
+
+relativeMemSize
+  } else if (memType == MemoryType.OFF_HEAP) {
 
-  LOG.info(s"Using $ratio time the heap size (${maxHeapSize} bytes) 
for Flink " +
-s"managed off-heap memory (${relativeMemSize >> 20} MB).")
+// The maximum heap memory has been adjusted according to the 
fraction
+val directMemorySize = 
(EnvironmentInformation.getMaxJvmHeapMemory() / fraction).toLong
 
-  relativeMemSize
+LOG.info(s"Using $fraction of the maximum memory size for " +
+  s"Flink managed off-heap memory (${directMemorySize >> 20} MB).")
+
+directMemorySize
+  } else {
--- End diff --

line break


> Integrate the off-heap memory configuration with the TaskManager start script
> -
>
> Key: FLINK-2641
> URL: https://issues.apache.org/jira/browse/FLINK-2641
> Project: Flink
>  Issue Type: New Feature
>  Components: Start-Stop Scripts
>Affects Versions: 0.10
>Reporter: Stephan Ewen
>Assignee: Maximilian Michels
> Fix For: 0.10
>
>
> The TaskManager start script needs to adjust the {{-Xmx}}, {{-Xms}}, and 
> {{-XX:MaxDirectMemorySize}} parameters according to the off-heap memory 
> settings.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2641] integrate off-heap memory configu...

2015-09-15 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1129#discussion_r39489771
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
 ---
@@ -1586,32 +1586,29 @@ object TaskManager {

ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY,
"MemoryManager fraction of the free memory must 
be between 0.0 and 1.0")
 
-  val relativeMemSize = 
(EnvironmentInformation.getSizeOfFreeHeapMemoryWithDefrag() *
-fraction).toLong
+  if (memType == MemoryType.HEAP) {
 
-  LOG.info(s"Using $fraction of the currently free heap space for 
Flink managed " +
-s" heap memory (${relativeMemSize >> 20} MB).")
+val relativeMemSize = 
(EnvironmentInformation.getSizeOfFreeHeapMemoryWithDefrag() *
+  fraction).toLong
 
-  relativeMemSize
-}
-else {
-  val ratio = configuration.getFloat(
-ConfigConstants.TASK_MANAGER_MEMORY_OFF_HEAP_RATIO_KEY,
-ConfigConstants.DEFAULT_MEMORY_MANAGER_MEMORY_OFF_HEAP_RATIO)
-  
-  checkConfigParameter(ratio > 0.0f,
-ConfigConstants.TASK_MANAGER_MEMORY_OFF_HEAP_RATIO_KEY,
-"MemoryManager ratio (off-heap memory / heap size) must be larger 
than zero")
-  
-  val maxHeapSize = EnvironmentInformation.getMaxJvmHeapMemory()
-  val relativeMemSize = (maxHeapSize * ratio).toLong
+LOG.info(s"Using $fraction of the currently free heap space for 
Flink managed " +
+  s" heap memory (${relativeMemSize >> 20} MB).")
+
+relativeMemSize
+  } else if (memType == MemoryType.OFF_HEAP) {
 
-  LOG.info(s"Using $ratio time the heap size (${maxHeapSize} bytes) 
for Flink " +
-s"managed off-heap memory (${relativeMemSize >> 20} MB).")
+// The maximum heap memory has been adjusted according to the 
fraction
+val directMemorySize = 
(EnvironmentInformation.getMaxJvmHeapMemory() / fraction).toLong
 
-  relativeMemSize
+LOG.info(s"Using $fraction of the maximum memory size for " +
+  s"Flink managed off-heap memory (${directMemorySize >> 20} MB).")
+
+directMemorySize
+  } else {
--- End diff --

line break


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2641) Integrate the off-heap memory configuration with the TaskManager start script

2015-09-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2641?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14745117#comment-14745117
 ] 

ASF GitHub Bot commented on FLINK-2641:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1129#discussion_r39489755
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
 ---
@@ -1586,32 +1586,29 @@ object TaskManager {

ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY,
"MemoryManager fraction of the free memory must 
be between 0.0 and 1.0")
 
-  val relativeMemSize = 
(EnvironmentInformation.getSizeOfFreeHeapMemoryWithDefrag() *
-fraction).toLong
+  if (memType == MemoryType.HEAP) {
 
-  LOG.info(s"Using $fraction of the currently free heap space for 
Flink managed " +
-s" heap memory (${relativeMemSize >> 20} MB).")
+val relativeMemSize = 
(EnvironmentInformation.getSizeOfFreeHeapMemoryWithDefrag() *
+  fraction).toLong
 
-  relativeMemSize
-}
-else {
-  val ratio = configuration.getFloat(
-ConfigConstants.TASK_MANAGER_MEMORY_OFF_HEAP_RATIO_KEY,
-ConfigConstants.DEFAULT_MEMORY_MANAGER_MEMORY_OFF_HEAP_RATIO)
-  
-  checkConfigParameter(ratio > 0.0f,
-ConfigConstants.TASK_MANAGER_MEMORY_OFF_HEAP_RATIO_KEY,
-"MemoryManager ratio (off-heap memory / heap size) must be larger 
than zero")
-  
-  val maxHeapSize = EnvironmentInformation.getMaxJvmHeapMemory()
-  val relativeMemSize = (maxHeapSize * ratio).toLong
+LOG.info(s"Using $fraction of the currently free heap space for 
Flink managed " +
+  s" heap memory (${relativeMemSize >> 20} MB).")
+
+relativeMemSize
+  } else if (memType == MemoryType.OFF_HEAP) {
--- End diff --

inconsistent code style, linebreak after closing `'}'`


> Integrate the off-heap memory configuration with the TaskManager start script
> -
>
> Key: FLINK-2641
> URL: https://issues.apache.org/jira/browse/FLINK-2641
> Project: Flink
>  Issue Type: New Feature
>  Components: Start-Stop Scripts
>Affects Versions: 0.10
>Reporter: Stephan Ewen
>Assignee: Maximilian Michels
> Fix For: 0.10
>
>
> The TaskManager start script needs to adjust the {{-Xmx}}, {{-Xms}}, and 
> {{-XX:MaxDirectMemorySize}} parameters according to the off-heap memory 
> settings.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2641] integrate off-heap memory configu...

2015-09-15 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1129#discussion_r39489755
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
 ---
@@ -1586,32 +1586,29 @@ object TaskManager {

ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY,
"MemoryManager fraction of the free memory must 
be between 0.0 and 1.0")
 
-  val relativeMemSize = 
(EnvironmentInformation.getSizeOfFreeHeapMemoryWithDefrag() *
-fraction).toLong
+  if (memType == MemoryType.HEAP) {
 
-  LOG.info(s"Using $fraction of the currently free heap space for 
Flink managed " +
-s" heap memory (${relativeMemSize >> 20} MB).")
+val relativeMemSize = 
(EnvironmentInformation.getSizeOfFreeHeapMemoryWithDefrag() *
+  fraction).toLong
 
-  relativeMemSize
-}
-else {
-  val ratio = configuration.getFloat(
-ConfigConstants.TASK_MANAGER_MEMORY_OFF_HEAP_RATIO_KEY,
-ConfigConstants.DEFAULT_MEMORY_MANAGER_MEMORY_OFF_HEAP_RATIO)
-  
-  checkConfigParameter(ratio > 0.0f,
-ConfigConstants.TASK_MANAGER_MEMORY_OFF_HEAP_RATIO_KEY,
-"MemoryManager ratio (off-heap memory / heap size) must be larger 
than zero")
-  
-  val maxHeapSize = EnvironmentInformation.getMaxJvmHeapMemory()
-  val relativeMemSize = (maxHeapSize * ratio).toLong
+LOG.info(s"Using $fraction of the currently free heap space for 
Flink managed " +
+  s" heap memory (${relativeMemSize >> 20} MB).")
+
+relativeMemSize
+  } else if (memType == MemoryType.OFF_HEAP) {
--- End diff --

inconsistent code style, linebreak after closing `'}'`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...

2015-09-15 Thread tillrohrmann
Github user tillrohrmann commented on the pull request:

https://github.com/apache/flink/pull/750#issuecomment-140335366
  
IMO, it would be better to wrap the `stopExecution` call in the 
`TaskManager` into a `Future`. This has the following reasons: First of all, 
with the current implementation, you'll miss all exceptions which occur in the 
`stop` method. Secondly, you will send a positive `TaskOperationResult` back 
before the stopping was executed. I haven't checked the semantic of the 
`TaskOperationResult` but it might be the case that the JM upon receiving this 
messages thinks that the stop call was successfully executed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2111) Add "stop" signal to cleanly shutdown streaming jobs

2015-09-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2111?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14745121#comment-14745121
 ] 

ASF GitHub Bot commented on FLINK-2111:
---

Github user tillrohrmann commented on the pull request:

https://github.com/apache/flink/pull/750#issuecomment-140335366
  
IMO, it would be better to wrap the `stopExecution` call in the 
`TaskManager` into a `Future`. This has the following reasons: First of all, 
with the current implementation, you'll miss all exceptions which occur in the 
`stop` method. Secondly, you will send a positive `TaskOperationResult` back 
before the stopping was executed. I haven't checked the semantic of the 
`TaskOperationResult` but it might be the case that the JM upon receiving this 
messages thinks that the stop call was successfully executed.


> Add "stop" signal to cleanly shutdown streaming jobs
> 
>
> Key: FLINK-2111
> URL: https://issues.apache.org/jira/browse/FLINK-2111
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Runtime, JobManager, Local Runtime, 
> Streaming, TaskManager, Webfrontend
>Reporter: Matthias J. Sax
>Assignee: Matthias J. Sax
>Priority: Minor
>
> Currently, streaming jobs can only be stopped using "cancel" command, what is 
> a "hard" stop with no clean shutdown.
> The new introduced "stop" signal, will only affect streaming source tasks 
> such that the sources can stop emitting data and shutdown cleanly, resulting 
> in a clean shutdown of the whole streaming job.
> This feature is a pre-requirment for 
> https://issues.apache.org/jira/browse/FLINK-1929



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2641] integrate off-heap memory configu...

2015-09-15 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/1129#discussion_r39494490
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
 ---
@@ -1586,32 +1586,29 @@ object TaskManager {

ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY,
"MemoryManager fraction of the free memory must 
be between 0.0 and 1.0")
 
-  val relativeMemSize = 
(EnvironmentInformation.getSizeOfFreeHeapMemoryWithDefrag() *
-fraction).toLong
+  if (memType == MemoryType.HEAP) {
 
-  LOG.info(s"Using $fraction of the currently free heap space for 
Flink managed " +
-s" heap memory (${relativeMemSize >> 20} MB).")
+val relativeMemSize = 
(EnvironmentInformation.getSizeOfFreeHeapMemoryWithDefrag() *
+  fraction).toLong
 
-  relativeMemSize
-}
-else {
-  val ratio = configuration.getFloat(
-ConfigConstants.TASK_MANAGER_MEMORY_OFF_HEAP_RATIO_KEY,
-ConfigConstants.DEFAULT_MEMORY_MANAGER_MEMORY_OFF_HEAP_RATIO)
-  
-  checkConfigParameter(ratio > 0.0f,
-ConfigConstants.TASK_MANAGER_MEMORY_OFF_HEAP_RATIO_KEY,
-"MemoryManager ratio (off-heap memory / heap size) must be larger 
than zero")
-  
-  val maxHeapSize = EnvironmentInformation.getMaxJvmHeapMemory()
-  val relativeMemSize = (maxHeapSize * ratio).toLong
+LOG.info(s"Using $fraction of the currently free heap space for 
Flink managed " +
+  s" heap memory (${relativeMemSize >> 20} MB).")
+
+relativeMemSize
+  } else if (memType == MemoryType.OFF_HEAP) {
 
-  LOG.info(s"Using $ratio time the heap size (${maxHeapSize} bytes) 
for Flink " +
-s"managed off-heap memory (${relativeMemSize >> 20} MB).")
+// The maximum heap memory has been adjusted according to the 
fraction
+val directMemorySize = 
(EnvironmentInformation.getMaxJvmHeapMemory() / fraction).toLong
--- End diff --

Never mind, it's (HEAPSIZE / (1.0 - fraction)) * fraction..


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2641) Integrate the off-heap memory configuration with the TaskManager start script

2015-09-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2641?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14745193#comment-14745193
 ] 

ASF GitHub Bot commented on FLINK-2641:
---

Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/1129#discussion_r39494490
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
 ---
@@ -1586,32 +1586,29 @@ object TaskManager {

ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY,
"MemoryManager fraction of the free memory must 
be between 0.0 and 1.0")
 
-  val relativeMemSize = 
(EnvironmentInformation.getSizeOfFreeHeapMemoryWithDefrag() *
-fraction).toLong
+  if (memType == MemoryType.HEAP) {
 
-  LOG.info(s"Using $fraction of the currently free heap space for 
Flink managed " +
-s" heap memory (${relativeMemSize >> 20} MB).")
+val relativeMemSize = 
(EnvironmentInformation.getSizeOfFreeHeapMemoryWithDefrag() *
+  fraction).toLong
 
-  relativeMemSize
-}
-else {
-  val ratio = configuration.getFloat(
-ConfigConstants.TASK_MANAGER_MEMORY_OFF_HEAP_RATIO_KEY,
-ConfigConstants.DEFAULT_MEMORY_MANAGER_MEMORY_OFF_HEAP_RATIO)
-  
-  checkConfigParameter(ratio > 0.0f,
-ConfigConstants.TASK_MANAGER_MEMORY_OFF_HEAP_RATIO_KEY,
-"MemoryManager ratio (off-heap memory / heap size) must be larger 
than zero")
-  
-  val maxHeapSize = EnvironmentInformation.getMaxJvmHeapMemory()
-  val relativeMemSize = (maxHeapSize * ratio).toLong
+LOG.info(s"Using $fraction of the currently free heap space for 
Flink managed " +
+  s" heap memory (${relativeMemSize >> 20} MB).")
+
+relativeMemSize
+  } else if (memType == MemoryType.OFF_HEAP) {
 
-  LOG.info(s"Using $ratio time the heap size (${maxHeapSize} bytes) 
for Flink " +
-s"managed off-heap memory (${relativeMemSize >> 20} MB).")
+// The maximum heap memory has been adjusted according to the 
fraction
+val directMemorySize = 
(EnvironmentInformation.getMaxJvmHeapMemory() / fraction).toLong
--- End diff --

Never mind, it's (HEAPSIZE / (1.0 - fraction)) * fraction..


> Integrate the off-heap memory configuration with the TaskManager start script
> -
>
> Key: FLINK-2641
> URL: https://issues.apache.org/jira/browse/FLINK-2641
> Project: Flink
>  Issue Type: New Feature
>  Components: Start-Stop Scripts
>Affects Versions: 0.10
>Reporter: Stephan Ewen
>Assignee: Maximilian Michels
> Fix For: 0.10
>
>
> The TaskManager start script needs to adjust the {{-Xmx}}, {{-Xms}}, and 
> {{-XX:MaxDirectMemorySize}} parameters according to the off-heap memory 
> settings.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Resolved] (FLINK-2582) Document how to build Flink with other Scala versions

2015-09-15 Thread Chiwan Park (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-2582?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chiwan Park resolved FLINK-2582.

Resolution: Fixed

> Document how to build Flink with other Scala versions
> -
>
> Key: FLINK-2582
> URL: https://issues.apache.org/jira/browse/FLINK-2582
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation
>Affects Versions: 0.10
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
> Fix For: 0.10
>
>
> On can build Flink for different Scala versions.
> We should describe in the documentation how to do that, ideally next to 
> building for different Hadoop versions.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2641] integrate off-heap memory configu...

2015-09-15 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1129#discussion_r39488999
  
--- Diff: flink-dist/src/main/flink-bin/bin/taskmanager.sh ---
@@ -51,13 +51,41 @@ if [[ $STARTSTOP == "start" ]]; then
 fi
 fi
 
-if [[ ! ${FLINK_TM_HEAP} =~ ${IS_NUMBER} ]]; then
-echo "[ERROR] Configured TaskManager JVM heap size is not a 
number. Please set '$KEY_TASKM_HEAP_MB' in $FLINK_CONF_FILE."
+if [[ ! ${FLINK_TM_HEAP} =~ ${IS_NUMBER} ]] || [[ "${FLINK_TM_HEAP}" 
-lt "0" ]]; then
+echo "[ERROR] Configured TaskManager JVM heap size is not a 
number. Please set '${KEY_TASKM_MEM_SIZE}' in ${FLINK_CONF_FILE}."
 exit 1
 fi
 
-if [ "$FLINK_TM_HEAP" -gt 0 ]; then
-export JVM_ARGS="$JVM_ARGS -Xms"$FLINK_TM_HEAP"m 
-Xmx"$FLINK_TM_HEAP"m"
+if [ "${FLINK_TM_HEAP}" -gt "0" ]; then
+
+TM_HEAP_SIZE=${FLINK_TM_HEAP}
+TM_OFFHEAP_SIZE=0
+# some space for Netty initilization
+NETTY_BUFFERS=$((1024 * 1024))
+
+if useOffHeapMemory; then
+if [[ "${FLINK_TM_MEM_MANAGED_SIZE}" -gt "0" ]]; then
+# We split up the total memory in heap and off-heap memory
+if [[ "${FLINK_TM_HEAP}" -le 
"${FLINK_TM_MEM_MANAGED_SIZE}" ]]; then
+echo "[ERROR] Configured TaskManager memory size 
('${KEY_TASKM_MEM_SIZE}') must be larger than the managed memory size 
('${KEY_TASKM_MEM_MANAGED_SIZE}')."
+exit 1
+fi
+TM_OFFHEAP_SIZE=${FLINK_TM_MEM_MANAGED_SIZE}
+TM_HEAP_SIZE=$((FLINK_TM_HEAP - FLINK_TM_MEM_MANAGED_SIZE))
+else
+# We calculate the memory using a fraction of the total 
memory
+if [[ `bc -l <<< "${FLINK_TM_MEM_MANAGED_FRACTION} >= 
1.0"` != "0" ]]; then
+echo "[ERROR] Configured TaskManager managed memory 
fraction is not a valid value. Please set '${KEY_TASKM_MEM_MANAGED_FRACTION}' 
in ${FLINK_CONF_FILE}"
+exit 1
+fi
+# recalculate the JVM heap memory by taking the off-heap 
ratio into account
+TM_OFFHEAP_SIZE=`printf '%.0f\n' $(bc -l <<< 
"${FLINK_TM_HEAP} * ${FLINK_TM_MEM_MANAGED_FRACTION}")`
+TM_HEAP_SIZE=$((FLINK_TM_HEAP - TM_OFFHEAP_SIZE))
+fi
+fi
+
+export JVM_ARGS="${JVM_ARGS} -Xms${TM_HEAP_SIZE}M 
-Xmx${TM_HEAP_SIZE}M -XX:MaxDirectMemorySize=$((TM_OFFHEAP_SIZE * 1024 * 1024 + 
FLINK_TM_MEM_NETWORK_SIZE + NETTY_BUFFERS))"
--- End diff --

Append "M" to -XX:MaxDirectMemorySize parameter?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2017] Add predefined required parameter...

2015-09-15 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1097#discussion_r39492715
  
--- Diff: 
flink-java/src/main/java/org/apache/flink/api/java/utils/RequiredParameter.java 
---
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.utils;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * Facility to manage required parameters in user defined functions.
+ */
+public class RequiredParameter {
+
+   private static final String HELP_TEXT_PARAM_DELIMITER = "\t";
+   private static final String HELP_TEXT_LINE_DELIMITER = "\n";
+
+   private HashMap data;
+
+   public RequiredParameter() {
+   this.data = new HashMap<>();
+   }
+
+   public void add(Option option) {
+   this.data.put(option.getName(), option);
--- End diff --

Check overwriting of existing option with same name?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2017) Add predefined required parameters to ParameterTool

2015-09-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2017?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14745167#comment-14745167
 ] 

ASF GitHub Bot commented on FLINK-2017:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1097#discussion_r39492715
  
--- Diff: 
flink-java/src/main/java/org/apache/flink/api/java/utils/RequiredParameter.java 
---
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.utils;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * Facility to manage required parameters in user defined functions.
+ */
+public class RequiredParameter {
+
+   private static final String HELP_TEXT_PARAM_DELIMITER = "\t";
+   private static final String HELP_TEXT_LINE_DELIMITER = "\n";
+
+   private HashMap data;
+
+   public RequiredParameter() {
+   this.data = new HashMap<>();
+   }
+
+   public void add(Option option) {
+   this.data.put(option.getName(), option);
--- End diff --

Check overwriting of existing option with same name?


> Add predefined required parameters to ParameterTool
> ---
>
> Key: FLINK-2017
> URL: https://issues.apache.org/jira/browse/FLINK-2017
> Project: Flink
>  Issue Type: Improvement
>Affects Versions: 0.9
>Reporter: Robert Metzger
>  Labels: starter
>
> In FLINK-1525 we've added the {{ParameterTool}}.
> During the PR review, there was a request for required parameters.
> This issue is about implementing a facility to define required parameters. 
> The tool should also be able to print a help menu with a list of all 
> parameters.
> This test case shows my initial ideas how to design the API
> {code}
>   @Test
>   public void requiredParameters() {
>   RequiredParameters required = new RequiredParameters();
>   Option input = required.add("input").alt("i").help("Path to 
> input file or directory"); // parameter with long and short variant
>   required.add("output"); // parameter only with long variant
>   Option parallelism = 
> required.add("parallelism").alt("p").type(Integer.class); // parameter with 
> type
>   Option spOption = 
> required.add("sourceParallelism").alt("sp").defaultValue(12).help("Number 
> specifying the number of parallel data source instances"); // parameter with 
> default value, specifying the type.
>   Option executionType = 
> required.add("executionType").alt("et").defaultValue("pipelined").choices("pipelined",
>  "batch");
>   ParameterUtil parameter = ParameterUtil.fromArgs(new 
> String[]{"-i", "someinput", "--output", "someout", "-p", "15"});
>   required.check(parameter);
>   required.printHelp();
>   required.checkAndPopulate(parameter);
>   String inputString = input.get();
>   int par = parallelism.getInteger();
>   String output = parameter.get("output");
>   int sourcePar = parameter.getInteger(spOption.getName());
>   }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2017) Add predefined required parameters to ParameterTool

2015-09-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2017?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14745171#comment-14745171
 ] 

ASF GitHub Bot commented on FLINK-2017:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1097#discussion_r39493290
  
--- Diff: 
flink-java/src/main/java/org/apache/flink/api/java/utils/RequiredParameter.java 
---
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.utils;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * Facility to manage required parameters in user defined functions.
+ */
+public class RequiredParameter {
+
+   private static final String HELP_TEXT_PARAM_DELIMITER = "\t";
+   private static final String HELP_TEXT_LINE_DELIMITER = "\n";
+
+   private HashMap data;
+
+   public RequiredParameter() {
+   this.data = new HashMap<>();
+   }
+
+   public void add(Option option) {
+   this.data.put(option.getName(), option);
+   }
+
+   /**
+* Check if all parameters defined as required have been supplied.
+*
+* @param parameterTool - parameters supplied by user.
+*/
+   public void check(ParameterTool parameterTool) throws 
RequiredParameterException {
+   for (Option o : data.values()) {
+   // if the parameter is not present or its value is 
undefined, throw a RuntimeException.
+   if (!parameterTool.data.containsKey(o.getName()) || 
keyIsUndefined(o.getName(), parameterTool.data)) {
+   throw new RequiredParameterException("Required 
parameter " + o.getName() + " not present.");
+   }
+   }
+   }
+
+   /**
+* Check if all parameters defined as required have been supplied. If 
not use the default values
+* which have been supplied. If no default value is supplied for a 
missing parameter, an exception is thrown.
+*
+* @param parameterTool - parameters supplied by the user.
+*/
+   public void checkAndPopulate(ParameterTool parameterTool) throws 
RequiredParameterException {
+   for (Option o : data.values()) {
+   String key = o.getName();
--- End diff --

Check for alternative / short key as well


> Add predefined required parameters to ParameterTool
> ---
>
> Key: FLINK-2017
> URL: https://issues.apache.org/jira/browse/FLINK-2017
> Project: Flink
>  Issue Type: Improvement
>Affects Versions: 0.9
>Reporter: Robert Metzger
>  Labels: starter
>
> In FLINK-1525 we've added the {{ParameterTool}}.
> During the PR review, there was a request for required parameters.
> This issue is about implementing a facility to define required parameters. 
> The tool should also be able to print a help menu with a list of all 
> parameters.
> This test case shows my initial ideas how to design the API
> {code}
>   @Test
>   public void requiredParameters() {
>   RequiredParameters required = new RequiredParameters();
>   Option input = required.add("input").alt("i").help("Path to 
> input file or directory"); // parameter with long and short variant
>   required.add("output"); // parameter only with long variant
>   Option parallelism = 
> required.add("parallelism").alt("p").type(Integer.class); // parameter with 
> type
>   Option spOption = 
> required.add("sourceParallelism").alt("sp").defaultValue(12).help("Number 
> specifying the number of parallel data source instances"); // parameter with 
> default value, specifying the type.
>   Option executionType = 
> required.add("executionType").alt("et").defaultValue("pipelined").choices("pipelined",
>  

[GitHub] flink pull request: [FLINK-2017] Add predefined required parameter...

2015-09-15 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1097#discussion_r39493290
  
--- Diff: 
flink-java/src/main/java/org/apache/flink/api/java/utils/RequiredParameter.java 
---
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.utils;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * Facility to manage required parameters in user defined functions.
+ */
+public class RequiredParameter {
+
+   private static final String HELP_TEXT_PARAM_DELIMITER = "\t";
+   private static final String HELP_TEXT_LINE_DELIMITER = "\n";
+
+   private HashMap data;
+
+   public RequiredParameter() {
+   this.data = new HashMap<>();
+   }
+
+   public void add(Option option) {
+   this.data.put(option.getName(), option);
+   }
+
+   /**
+* Check if all parameters defined as required have been supplied.
+*
+* @param parameterTool - parameters supplied by user.
+*/
+   public void check(ParameterTool parameterTool) throws 
RequiredParameterException {
+   for (Option o : data.values()) {
+   // if the parameter is not present or its value is 
undefined, throw a RuntimeException.
+   if (!parameterTool.data.containsKey(o.getName()) || 
keyIsUndefined(o.getName(), parameterTool.data)) {
+   throw new RequiredParameterException("Required 
parameter " + o.getName() + " not present.");
+   }
+   }
+   }
+
+   /**
+* Check if all parameters defined as required have been supplied. If 
not use the default values
+* which have been supplied. If no default value is supplied for a 
missing parameter, an exception is thrown.
+*
+* @param parameterTool - parameters supplied by the user.
+*/
+   public void checkAndPopulate(ParameterTool parameterTool) throws 
RequiredParameterException {
+   for (Option o : data.values()) {
+   String key = o.getName();
--- End diff --

Check for alternative / short key as well


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2017) Add predefined required parameters to ParameterTool

2015-09-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2017?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14745182#comment-14745182
 ] 

ASF GitHub Bot commented on FLINK-2017:
---

Github user fhueske commented on the pull request:

https://github.com/apache/flink/pull/1097#issuecomment-140343995
  
Thanks for the PR @mliesenberg!
Can you add the checks (and tests to verify the checks are working) that I 
mentioned in my comments?
The original JIRA issue also included type checks. I think we could 
restrict those to Java primitives (Integer, Long, Double, Float, Boolean). 
I would be OK with adding this PR without type checks (and opening another 
JIRA for that) but it would be a cool feature if you'd like to add that.


> Add predefined required parameters to ParameterTool
> ---
>
> Key: FLINK-2017
> URL: https://issues.apache.org/jira/browse/FLINK-2017
> Project: Flink
>  Issue Type: Improvement
>Affects Versions: 0.9
>Reporter: Robert Metzger
>  Labels: starter
>
> In FLINK-1525 we've added the {{ParameterTool}}.
> During the PR review, there was a request for required parameters.
> This issue is about implementing a facility to define required parameters. 
> The tool should also be able to print a help menu with a list of all 
> parameters.
> This test case shows my initial ideas how to design the API
> {code}
>   @Test
>   public void requiredParameters() {
>   RequiredParameters required = new RequiredParameters();
>   Option input = required.add("input").alt("i").help("Path to 
> input file or directory"); // parameter with long and short variant
>   required.add("output"); // parameter only with long variant
>   Option parallelism = 
> required.add("parallelism").alt("p").type(Integer.class); // parameter with 
> type
>   Option spOption = 
> required.add("sourceParallelism").alt("sp").defaultValue(12).help("Number 
> specifying the number of parallel data source instances"); // parameter with 
> default value, specifying the type.
>   Option executionType = 
> required.add("executionType").alt("et").defaultValue("pipelined").choices("pipelined",
>  "batch");
>   ParameterUtil parameter = ParameterUtil.fromArgs(new 
> String[]{"-i", "someinput", "--output", "someout", "-p", "15"});
>   required.check(parameter);
>   required.printHelp();
>   required.checkAndPopulate(parameter);
>   String inputString = input.get();
>   int par = parallelism.getInteger();
>   String output = parameter.get("output");
>   int sourcePar = parameter.getInteger(spOption.getName());
>   }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2641) Integrate the off-heap memory configuration with the TaskManager start script

2015-09-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2641?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14745184#comment-14745184
 ] 

ASF GitHub Bot commented on FLINK-2641:
---

Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/1129#discussion_r39493902
  
--- Diff: flink-dist/src/main/flink-bin/bin/taskmanager.sh ---
@@ -51,13 +51,41 @@ if [[ $STARTSTOP == "start" ]]; then
 fi
 fi
 
-if [[ ! ${FLINK_TM_HEAP} =~ ${IS_NUMBER} ]]; then
-echo "[ERROR] Configured TaskManager JVM heap size is not a 
number. Please set '$KEY_TASKM_HEAP_MB' in $FLINK_CONF_FILE."
+if [[ ! ${FLINK_TM_HEAP} =~ ${IS_NUMBER} ]] || [[ "${FLINK_TM_HEAP}" 
-lt "0" ]]; then
+echo "[ERROR] Configured TaskManager JVM heap size is not a 
number. Please set '${KEY_TASKM_MEM_SIZE}' in ${FLINK_CONF_FILE}."
 exit 1
 fi
 
-if [ "$FLINK_TM_HEAP" -gt 0 ]; then
-export JVM_ARGS="$JVM_ARGS -Xms"$FLINK_TM_HEAP"m 
-Xmx"$FLINK_TM_HEAP"m"
+if [ "${FLINK_TM_HEAP}" -gt "0" ]; then
+
+TM_HEAP_SIZE=${FLINK_TM_HEAP}
+TM_OFFHEAP_SIZE=0
+# some space for Netty initilization
--- End diff --

Thanks


> Integrate the off-heap memory configuration with the TaskManager start script
> -
>
> Key: FLINK-2641
> URL: https://issues.apache.org/jira/browse/FLINK-2641
> Project: Flink
>  Issue Type: New Feature
>  Components: Start-Stop Scripts
>Affects Versions: 0.10
>Reporter: Stephan Ewen
>Assignee: Maximilian Michels
> Fix For: 0.10
>
>
> The TaskManager start script needs to adjust the {{-Xmx}}, {{-Xms}}, and 
> {{-XX:MaxDirectMemorySize}} parameters according to the off-heap memory 
> settings.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2641] integrate off-heap memory configu...

2015-09-15 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/1129#discussion_r39493918
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
 ---
@@ -1586,32 +1586,29 @@ object TaskManager {

ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY,
"MemoryManager fraction of the free memory must 
be between 0.0 and 1.0")
 
-  val relativeMemSize = 
(EnvironmentInformation.getSizeOfFreeHeapMemoryWithDefrag() *
-fraction).toLong
+  if (memType == MemoryType.HEAP) {
 
-  LOG.info(s"Using $fraction of the currently free heap space for 
Flink managed " +
-s" heap memory (${relativeMemSize >> 20} MB).")
+val relativeMemSize = 
(EnvironmentInformation.getSizeOfFreeHeapMemoryWithDefrag() *
+  fraction).toLong
 
-  relativeMemSize
-}
-else {
-  val ratio = configuration.getFloat(
-ConfigConstants.TASK_MANAGER_MEMORY_OFF_HEAP_RATIO_KEY,
-ConfigConstants.DEFAULT_MEMORY_MANAGER_MEMORY_OFF_HEAP_RATIO)
-  
-  checkConfigParameter(ratio > 0.0f,
-ConfigConstants.TASK_MANAGER_MEMORY_OFF_HEAP_RATIO_KEY,
-"MemoryManager ratio (off-heap memory / heap size) must be larger 
than zero")
-  
-  val maxHeapSize = EnvironmentInformation.getMaxJvmHeapMemory()
-  val relativeMemSize = (maxHeapSize * ratio).toLong
+LOG.info(s"Using $fraction of the currently free heap space for 
Flink managed " +
+  s" heap memory (${relativeMemSize >> 20} MB).")
+
+relativeMemSize
+  } else if (memType == MemoryType.OFF_HEAP) {
--- End diff --

Thanks.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2641) Integrate the off-heap memory configuration with the TaskManager start script

2015-09-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2641?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14745186#comment-14745186
 ] 

ASF GitHub Bot commented on FLINK-2641:
---

Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/1129#discussion_r39493933
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
 ---
@@ -1586,32 +1586,29 @@ object TaskManager {

ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY,
"MemoryManager fraction of the free memory must 
be between 0.0 and 1.0")
 
-  val relativeMemSize = 
(EnvironmentInformation.getSizeOfFreeHeapMemoryWithDefrag() *
-fraction).toLong
+  if (memType == MemoryType.HEAP) {
 
-  LOG.info(s"Using $fraction of the currently free heap space for 
Flink managed " +
-s" heap memory (${relativeMemSize >> 20} MB).")
+val relativeMemSize = 
(EnvironmentInformation.getSizeOfFreeHeapMemoryWithDefrag() *
+  fraction).toLong
 
-  relativeMemSize
-}
-else {
-  val ratio = configuration.getFloat(
-ConfigConstants.TASK_MANAGER_MEMORY_OFF_HEAP_RATIO_KEY,
-ConfigConstants.DEFAULT_MEMORY_MANAGER_MEMORY_OFF_HEAP_RATIO)
-  
-  checkConfigParameter(ratio > 0.0f,
-ConfigConstants.TASK_MANAGER_MEMORY_OFF_HEAP_RATIO_KEY,
-"MemoryManager ratio (off-heap memory / heap size) must be larger 
than zero")
-  
-  val maxHeapSize = EnvironmentInformation.getMaxJvmHeapMemory()
-  val relativeMemSize = (maxHeapSize * ratio).toLong
+LOG.info(s"Using $fraction of the currently free heap space for 
Flink managed " +
+  s" heap memory (${relativeMemSize >> 20} MB).")
+
+relativeMemSize
+  } else if (memType == MemoryType.OFF_HEAP) {
 
-  LOG.info(s"Using $ratio time the heap size (${maxHeapSize} bytes) 
for Flink " +
-s"managed off-heap memory (${relativeMemSize >> 20} MB).")
+// The maximum heap memory has been adjusted according to the 
fraction
+val directMemorySize = 
(EnvironmentInformation.getMaxJvmHeapMemory() / fraction).toLong
 
-  relativeMemSize
+LOG.info(s"Using $fraction of the maximum memory size for " +
+  s"Flink managed off-heap memory (${directMemorySize >> 20} MB).")
+
+directMemorySize
+  } else {
--- End diff --

Thanks


> Integrate the off-heap memory configuration with the TaskManager start script
> -
>
> Key: FLINK-2641
> URL: https://issues.apache.org/jira/browse/FLINK-2641
> Project: Flink
>  Issue Type: New Feature
>  Components: Start-Stop Scripts
>Affects Versions: 0.10
>Reporter: Stephan Ewen
>Assignee: Maximilian Michels
> Fix For: 0.10
>
>
> The TaskManager start script needs to adjust the {{-Xmx}}, {{-Xms}}, and 
> {{-XX:MaxDirectMemorySize}} parameters according to the off-heap memory 
> settings.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2641] integrate off-heap memory configu...

2015-09-15 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/1129#discussion_r39493898
  
--- Diff: flink-dist/src/main/flink-bin/bin/taskmanager.sh ---
@@ -51,13 +51,41 @@ if [[ $STARTSTOP == "start" ]]; then
 fi
 fi
 
-if [[ ! ${FLINK_TM_HEAP} =~ ${IS_NUMBER} ]]; then
-echo "[ERROR] Configured TaskManager JVM heap size is not a 
number. Please set '$KEY_TASKM_HEAP_MB' in $FLINK_CONF_FILE."
+if [[ ! ${FLINK_TM_HEAP} =~ ${IS_NUMBER} ]] || [[ "${FLINK_TM_HEAP}" 
-lt "0" ]]; then
+echo "[ERROR] Configured TaskManager JVM heap size is not a 
number. Please set '${KEY_TASKM_MEM_SIZE}' in ${FLINK_CONF_FILE}."
 exit 1
 fi
 
-if [ "$FLINK_TM_HEAP" -gt 0 ]; then
-export JVM_ARGS="$JVM_ARGS -Xms"$FLINK_TM_HEAP"m 
-Xmx"$FLINK_TM_HEAP"m"
+if [ "${FLINK_TM_HEAP}" -gt "0" ]; then
+
+TM_HEAP_SIZE=${FLINK_TM_HEAP}
+TM_OFFHEAP_SIZE=0
+# some space for Netty initilization
+NETTY_BUFFERS=$((1024 * 1024))
--- End diff --

Yes, actually it needs much less. This is just some static initialization 
code Netty runs.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2641] integrate off-heap memory configu...

2015-09-15 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/1129#discussion_r39493879
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
 ---
@@ -1586,32 +1586,29 @@ object TaskManager {

ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY,
"MemoryManager fraction of the free memory must 
be between 0.0 and 1.0")
 
-  val relativeMemSize = 
(EnvironmentInformation.getSizeOfFreeHeapMemoryWithDefrag() *
-fraction).toLong
+  if (memType == MemoryType.HEAP) {
 
-  LOG.info(s"Using $fraction of the currently free heap space for 
Flink managed " +
-s" heap memory (${relativeMemSize >> 20} MB).")
+val relativeMemSize = 
(EnvironmentInformation.getSizeOfFreeHeapMemoryWithDefrag() *
+  fraction).toLong
 
-  relativeMemSize
-}
-else {
-  val ratio = configuration.getFloat(
-ConfigConstants.TASK_MANAGER_MEMORY_OFF_HEAP_RATIO_KEY,
-ConfigConstants.DEFAULT_MEMORY_MANAGER_MEMORY_OFF_HEAP_RATIO)
-  
-  checkConfigParameter(ratio > 0.0f,
-ConfigConstants.TASK_MANAGER_MEMORY_OFF_HEAP_RATIO_KEY,
-"MemoryManager ratio (off-heap memory / heap size) must be larger 
than zero")
-  
-  val maxHeapSize = EnvironmentInformation.getMaxJvmHeapMemory()
-  val relativeMemSize = (maxHeapSize * ratio).toLong
+LOG.info(s"Using $fraction of the currently free heap space for 
Flink managed " +
+  s" heap memory (${relativeMemSize >> 20} MB).")
+
+relativeMemSize
+  } else if (memType == MemoryType.OFF_HEAP) {
 
-  LOG.info(s"Using $ratio time the heap size (${maxHeapSize} bytes) 
for Flink " +
-s"managed off-heap memory (${relativeMemSize >> 20} MB).")
+// The maximum heap memory has been adjusted according to the 
fraction
+val directMemorySize = 
(EnvironmentInformation.getMaxJvmHeapMemory() / fraction).toLong
--- End diff --

Yes, actually it is (HEAPSIZE /  fraction) * (1.0 - fraction) :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2017] Add predefined required parameter...

2015-09-15 Thread fhueske
Github user fhueske commented on the pull request:

https://github.com/apache/flink/pull/1097#issuecomment-140343995
  
Thanks for the PR @mliesenberg!
Can you add the checks (and tests to verify the checks are working) that I 
mentioned in my comments?
The original JIRA issue also included type checks. I think we could 
restrict those to Java primitives (Integer, Long, Double, Float, Boolean). 
I would be OK with adding this PR without type checks (and opening another 
JIRA for that) but it would be a cool feature if you'd like to add that.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2641) Integrate the off-heap memory configuration with the TaskManager start script

2015-09-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2641?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14745181#comment-14745181
 ] 

ASF GitHub Bot commented on FLINK-2641:
---

Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/1129#discussion_r39493894
  
--- Diff: flink-dist/src/main/flink-bin/bin/taskmanager.sh ---
@@ -51,13 +51,41 @@ if [[ $STARTSTOP == "start" ]]; then
 fi
 fi
 
-if [[ ! ${FLINK_TM_HEAP} =~ ${IS_NUMBER} ]]; then
-echo "[ERROR] Configured TaskManager JVM heap size is not a 
number. Please set '$KEY_TASKM_HEAP_MB' in $FLINK_CONF_FILE."
+if [[ ! ${FLINK_TM_HEAP} =~ ${IS_NUMBER} ]] || [[ "${FLINK_TM_HEAP}" 
-lt "0" ]]; then
+echo "[ERROR] Configured TaskManager JVM heap size is not a 
number. Please set '${KEY_TASKM_MEM_SIZE}' in ${FLINK_CONF_FILE}."
 exit 1
 fi
 
-if [ "$FLINK_TM_HEAP" -gt 0 ]; then
-export JVM_ARGS="$JVM_ARGS -Xms"$FLINK_TM_HEAP"m 
-Xmx"$FLINK_TM_HEAP"m"
+if [ "${FLINK_TM_HEAP}" -gt "0" ]; then
+
+TM_HEAP_SIZE=${FLINK_TM_HEAP}
+TM_OFFHEAP_SIZE=0
+# some space for Netty initilization
+NETTY_BUFFERS=$((1024 * 1024))
+
+if useOffHeapMemory; then
+if [[ "${FLINK_TM_MEM_MANAGED_SIZE}" -gt "0" ]]; then
+# We split up the total memory in heap and off-heap memory
+if [[ "${FLINK_TM_HEAP}" -le 
"${FLINK_TM_MEM_MANAGED_SIZE}" ]]; then
+echo "[ERROR] Configured TaskManager memory size 
('${KEY_TASKM_MEM_SIZE}') must be larger than the managed memory size 
('${KEY_TASKM_MEM_MANAGED_SIZE}')."
+exit 1
+fi
+TM_OFFHEAP_SIZE=${FLINK_TM_MEM_MANAGED_SIZE}
+TM_HEAP_SIZE=$((FLINK_TM_HEAP - FLINK_TM_MEM_MANAGED_SIZE))
+else
+# We calculate the memory using a fraction of the total 
memory
+if [[ `bc -l <<< "${FLINK_TM_MEM_MANAGED_FRACTION} >= 
1.0"` != "0" ]]; then
+echo "[ERROR] Configured TaskManager managed memory 
fraction is not a valid value. Please set '${KEY_TASKM_MEM_MANAGED_FRACTION}' 
in ${FLINK_CONF_FILE}"
--- End diff --

Thanks, added a lower bound.


> Integrate the off-heap memory configuration with the TaskManager start script
> -
>
> Key: FLINK-2641
> URL: https://issues.apache.org/jira/browse/FLINK-2641
> Project: Flink
>  Issue Type: New Feature
>  Components: Start-Stop Scripts
>Affects Versions: 0.10
>Reporter: Stephan Ewen
>Assignee: Maximilian Michels
> Fix For: 0.10
>
>
> The TaskManager start script needs to adjust the {{-Xmx}}, {{-Xms}}, and 
> {{-XX:MaxDirectMemorySize}} parameters according to the off-heap memory 
> settings.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2641] integrate off-heap memory configu...

2015-09-15 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/1129#discussion_r39493933
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
 ---
@@ -1586,32 +1586,29 @@ object TaskManager {

ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY,
"MemoryManager fraction of the free memory must 
be between 0.0 and 1.0")
 
-  val relativeMemSize = 
(EnvironmentInformation.getSizeOfFreeHeapMemoryWithDefrag() *
-fraction).toLong
+  if (memType == MemoryType.HEAP) {
 
-  LOG.info(s"Using $fraction of the currently free heap space for 
Flink managed " +
-s" heap memory (${relativeMemSize >> 20} MB).")
+val relativeMemSize = 
(EnvironmentInformation.getSizeOfFreeHeapMemoryWithDefrag() *
+  fraction).toLong
 
-  relativeMemSize
-}
-else {
-  val ratio = configuration.getFloat(
-ConfigConstants.TASK_MANAGER_MEMORY_OFF_HEAP_RATIO_KEY,
-ConfigConstants.DEFAULT_MEMORY_MANAGER_MEMORY_OFF_HEAP_RATIO)
-  
-  checkConfigParameter(ratio > 0.0f,
-ConfigConstants.TASK_MANAGER_MEMORY_OFF_HEAP_RATIO_KEY,
-"MemoryManager ratio (off-heap memory / heap size) must be larger 
than zero")
-  
-  val maxHeapSize = EnvironmentInformation.getMaxJvmHeapMemory()
-  val relativeMemSize = (maxHeapSize * ratio).toLong
+LOG.info(s"Using $fraction of the currently free heap space for 
Flink managed " +
+  s" heap memory (${relativeMemSize >> 20} MB).")
+
+relativeMemSize
+  } else if (memType == MemoryType.OFF_HEAP) {
 
-  LOG.info(s"Using $ratio time the heap size (${maxHeapSize} bytes) 
for Flink " +
-s"managed off-heap memory (${relativeMemSize >> 20} MB).")
+// The maximum heap memory has been adjusted according to the 
fraction
+val directMemorySize = 
(EnvironmentInformation.getMaxJvmHeapMemory() / fraction).toLong
 
-  relativeMemSize
+LOG.info(s"Using $fraction of the maximum memory size for " +
+  s"Flink managed off-heap memory (${directMemorySize >> 20} MB).")
+
+directMemorySize
+  } else {
--- End diff --

Thanks


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2641) Integrate the off-heap memory configuration with the TaskManager start script

2015-09-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2641?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14745183#comment-14745183
 ] 

ASF GitHub Bot commented on FLINK-2641:
---

Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/1129#discussion_r39493898
  
--- Diff: flink-dist/src/main/flink-bin/bin/taskmanager.sh ---
@@ -51,13 +51,41 @@ if [[ $STARTSTOP == "start" ]]; then
 fi
 fi
 
-if [[ ! ${FLINK_TM_HEAP} =~ ${IS_NUMBER} ]]; then
-echo "[ERROR] Configured TaskManager JVM heap size is not a 
number. Please set '$KEY_TASKM_HEAP_MB' in $FLINK_CONF_FILE."
+if [[ ! ${FLINK_TM_HEAP} =~ ${IS_NUMBER} ]] || [[ "${FLINK_TM_HEAP}" 
-lt "0" ]]; then
+echo "[ERROR] Configured TaskManager JVM heap size is not a 
number. Please set '${KEY_TASKM_MEM_SIZE}' in ${FLINK_CONF_FILE}."
 exit 1
 fi
 
-if [ "$FLINK_TM_HEAP" -gt 0 ]; then
-export JVM_ARGS="$JVM_ARGS -Xms"$FLINK_TM_HEAP"m 
-Xmx"$FLINK_TM_HEAP"m"
+if [ "${FLINK_TM_HEAP}" -gt "0" ]; then
+
+TM_HEAP_SIZE=${FLINK_TM_HEAP}
+TM_OFFHEAP_SIZE=0
+# some space for Netty initilization
+NETTY_BUFFERS=$((1024 * 1024))
--- End diff --

Yes, actually it needs much less. This is just some static initialization 
code Netty runs.


> Integrate the off-heap memory configuration with the TaskManager start script
> -
>
> Key: FLINK-2641
> URL: https://issues.apache.org/jira/browse/FLINK-2641
> Project: Flink
>  Issue Type: New Feature
>  Components: Start-Stop Scripts
>Affects Versions: 0.10
>Reporter: Stephan Ewen
>Assignee: Maximilian Michels
> Fix For: 0.10
>
>
> The TaskManager start script needs to adjust the {{-Xmx}}, {{-Xms}}, and 
> {{-XX:MaxDirectMemorySize}} parameters according to the off-heap memory 
> settings.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2641) Integrate the off-heap memory configuration with the TaskManager start script

2015-09-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2641?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14745185#comment-14745185
 ] 

ASF GitHub Bot commented on FLINK-2641:
---

Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/1129#discussion_r39493918
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
 ---
@@ -1586,32 +1586,29 @@ object TaskManager {

ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY,
"MemoryManager fraction of the free memory must 
be between 0.0 and 1.0")
 
-  val relativeMemSize = 
(EnvironmentInformation.getSizeOfFreeHeapMemoryWithDefrag() *
-fraction).toLong
+  if (memType == MemoryType.HEAP) {
 
-  LOG.info(s"Using $fraction of the currently free heap space for 
Flink managed " +
-s" heap memory (${relativeMemSize >> 20} MB).")
+val relativeMemSize = 
(EnvironmentInformation.getSizeOfFreeHeapMemoryWithDefrag() *
+  fraction).toLong
 
-  relativeMemSize
-}
-else {
-  val ratio = configuration.getFloat(
-ConfigConstants.TASK_MANAGER_MEMORY_OFF_HEAP_RATIO_KEY,
-ConfigConstants.DEFAULT_MEMORY_MANAGER_MEMORY_OFF_HEAP_RATIO)
-  
-  checkConfigParameter(ratio > 0.0f,
-ConfigConstants.TASK_MANAGER_MEMORY_OFF_HEAP_RATIO_KEY,
-"MemoryManager ratio (off-heap memory / heap size) must be larger 
than zero")
-  
-  val maxHeapSize = EnvironmentInformation.getMaxJvmHeapMemory()
-  val relativeMemSize = (maxHeapSize * ratio).toLong
+LOG.info(s"Using $fraction of the currently free heap space for 
Flink managed " +
+  s" heap memory (${relativeMemSize >> 20} MB).")
+
+relativeMemSize
+  } else if (memType == MemoryType.OFF_HEAP) {
--- End diff --

Thanks.


> Integrate the off-heap memory configuration with the TaskManager start script
> -
>
> Key: FLINK-2641
> URL: https://issues.apache.org/jira/browse/FLINK-2641
> Project: Flink
>  Issue Type: New Feature
>  Components: Start-Stop Scripts
>Affects Versions: 0.10
>Reporter: Stephan Ewen
>Assignee: Maximilian Michels
> Fix For: 0.10
>
>
> The TaskManager start script needs to adjust the {{-Xmx}}, {{-Xms}}, and 
> {{-XX:MaxDirectMemorySize}} parameters according to the off-heap memory 
> settings.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2641) Integrate the off-heap memory configuration with the TaskManager start script

2015-09-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2641?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14745180#comment-14745180
 ] 

ASF GitHub Bot commented on FLINK-2641:
---

Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/1129#discussion_r39493879
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
 ---
@@ -1586,32 +1586,29 @@ object TaskManager {

ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY,
"MemoryManager fraction of the free memory must 
be between 0.0 and 1.0")
 
-  val relativeMemSize = 
(EnvironmentInformation.getSizeOfFreeHeapMemoryWithDefrag() *
-fraction).toLong
+  if (memType == MemoryType.HEAP) {
 
-  LOG.info(s"Using $fraction of the currently free heap space for 
Flink managed " +
-s" heap memory (${relativeMemSize >> 20} MB).")
+val relativeMemSize = 
(EnvironmentInformation.getSizeOfFreeHeapMemoryWithDefrag() *
+  fraction).toLong
 
-  relativeMemSize
-}
-else {
-  val ratio = configuration.getFloat(
-ConfigConstants.TASK_MANAGER_MEMORY_OFF_HEAP_RATIO_KEY,
-ConfigConstants.DEFAULT_MEMORY_MANAGER_MEMORY_OFF_HEAP_RATIO)
-  
-  checkConfigParameter(ratio > 0.0f,
-ConfigConstants.TASK_MANAGER_MEMORY_OFF_HEAP_RATIO_KEY,
-"MemoryManager ratio (off-heap memory / heap size) must be larger 
than zero")
-  
-  val maxHeapSize = EnvironmentInformation.getMaxJvmHeapMemory()
-  val relativeMemSize = (maxHeapSize * ratio).toLong
+LOG.info(s"Using $fraction of the currently free heap space for 
Flink managed " +
+  s" heap memory (${relativeMemSize >> 20} MB).")
+
+relativeMemSize
+  } else if (memType == MemoryType.OFF_HEAP) {
 
-  LOG.info(s"Using $ratio time the heap size (${maxHeapSize} bytes) 
for Flink " +
-s"managed off-heap memory (${relativeMemSize >> 20} MB).")
+// The maximum heap memory has been adjusted according to the 
fraction
+val directMemorySize = 
(EnvironmentInformation.getMaxJvmHeapMemory() / fraction).toLong
--- End diff --

Yes, actually it is (HEAPSIZE /  fraction) * (1.0 - fraction) :)


> Integrate the off-heap memory configuration with the TaskManager start script
> -
>
> Key: FLINK-2641
> URL: https://issues.apache.org/jira/browse/FLINK-2641
> Project: Flink
>  Issue Type: New Feature
>  Components: Start-Stop Scripts
>Affects Versions: 0.10
>Reporter: Stephan Ewen
>Assignee: Maximilian Michels
> Fix For: 0.10
>
>
> The TaskManager start script needs to adjust the {{-Xmx}}, {{-Xms}}, and 
> {{-XX:MaxDirectMemorySize}} parameters according to the off-heap memory 
> settings.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-1745) Add exact k-nearest-neighbours algorithm to machine learning library

2015-09-15 Thread Chiwan Park (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1745?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14745054#comment-14745054
 ] 

Chiwan Park commented on FLINK-1745:


Hi [~danielblazevski], you can modify my implementation by pulling my issue 
branch (https://github.com/chiwanpark/flink/tree/FLINK-1745). After 
implemented, please open pull request for reviewing. Then I'll close my PR and 
review your PR.

If you have any question about my implementation, post the question to this 
thread. :-)

> Add exact k-nearest-neighbours algorithm to machine learning library
> 
>
> Key: FLINK-1745
> URL: https://issues.apache.org/jira/browse/FLINK-1745
> Project: Flink
>  Issue Type: New Feature
>  Components: Machine Learning Library
>Reporter: Till Rohrmann
>  Labels: ML, Starter
>
> Even though the k-nearest-neighbours (kNN) [1,2] algorithm is quite trivial 
> it is still used as a mean to classify data and to do regression. This issue 
> focuses on the implementation of an exact kNN (H-BNLJ, H-BRJ) algorithm as 
> proposed in [2].
> Could be a starter task.
> Resources:
> [1] [http://en.wikipedia.org/wiki/K-nearest_neighbors_algorithm]
> [2] [https://www.cs.utah.edu/~lifeifei/papers/mrknnj.pdf]



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2641] integrate off-heap memory configu...

2015-09-15 Thread fhueske
Github user fhueske commented on the pull request:

https://github.com/apache/flink/pull/1129#issuecomment-140336732
  
Thanks for the PR @mxm.
I left a few comments inline. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2641) Integrate the off-heap memory configuration with the TaskManager start script

2015-09-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2641?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14745132#comment-14745132
 ] 

ASF GitHub Bot commented on FLINK-2641:
---

Github user fhueske commented on the pull request:

https://github.com/apache/flink/pull/1129#issuecomment-140336732
  
Thanks for the PR @mxm.
I left a few comments inline. 


> Integrate the off-heap memory configuration with the TaskManager start script
> -
>
> Key: FLINK-2641
> URL: https://issues.apache.org/jira/browse/FLINK-2641
> Project: Flink
>  Issue Type: New Feature
>  Components: Start-Stop Scripts
>Affects Versions: 0.10
>Reporter: Stephan Ewen
>Assignee: Maximilian Michels
> Fix For: 0.10
>
>
> The TaskManager start script needs to adjust the {{-Xmx}}, {{-Xms}}, and 
> {{-XX:MaxDirectMemorySize}} parameters according to the off-heap memory 
> settings.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2017) Add predefined required parameters to ParameterTool

2015-09-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2017?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14745161#comment-14745161
 ] 

ASF GitHub Bot commented on FLINK-2017:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1097#discussion_r39492448
  
--- Diff: 
flink-java/src/main/java/org/apache/flink/api/java/utils/Option.java ---
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.utils;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * Internal representation of a parameter passed to a user defined 
function.
+ */
+public class Option {
+
+   private String longName;
+   private String shortName;
+
+   private T defaultValue;
+   private Set choices;
+
+   private String helpText;
+
+   public Option(String name) {
+   this.longName = name;
+   this.choices = new HashSet<>();
+   }
+
+   /**
+* Define a alternative / short name of the Parameter.
+*
+* @param shortName - short version of the parameter name
+* @return the updated Option
+*/
+   public Option alt(String shortName) {
+   this.shortName = shortName;
+   return this;
+   }
+
+   /**
+* Define a default value for the option.
+*
+* Throws an exception if the list of possible values for the parameter 
is not empty and the default value passed
+* is not in the list.
+*
+* @param defaultValue - the default value
+* @return the updated Option
+*/
+   public Option defaultValue(T defaultValue) {
+   if (this.choices.isEmpty()) {
+   return this.setDefaultValue(defaultValue);
+   } else {
+   if (this.choices.contains(defaultValue)) {
+   return this.setDefaultValue(defaultValue);
+   } else {
+   throw new 
IllegalArgumentException("defaultValue passed is not in the list of expected 
values.");
+   }
+   }
+   }
+
+   /**
+* Restrict the list of possible values of the parameter.
+*
+* @param choices - the allowed values of the parameter.
+* @return the updated Option
+*/
+   public Option choices(T... choices) {
+   Collections.addAll(this.choices, choices);
--- End diff --

Add check that `defaultValue` (if already set) is in choices.


> Add predefined required parameters to ParameterTool
> ---
>
> Key: FLINK-2017
> URL: https://issues.apache.org/jira/browse/FLINK-2017
> Project: Flink
>  Issue Type: Improvement
>Affects Versions: 0.9
>Reporter: Robert Metzger
>  Labels: starter
>
> In FLINK-1525 we've added the {{ParameterTool}}.
> During the PR review, there was a request for required parameters.
> This issue is about implementing a facility to define required parameters. 
> The tool should also be able to print a help menu with a list of all 
> parameters.
> This test case shows my initial ideas how to design the API
> {code}
>   @Test
>   public void requiredParameters() {
>   RequiredParameters required = new RequiredParameters();
>   Option input = required.add("input").alt("i").help("Path to 
> input file or directory"); // parameter with long and short variant
>   required.add("output"); // parameter only with long variant
>   Option parallelism = 
> required.add("parallelism").alt("p").type(Integer.class); // parameter with 
> type
>   Option spOption = 
> required.add("sourceParallelism").alt("sp").defaultValue(12).help("Number 
> specifying the number of parallel data source instances"); 

[GitHub] flink pull request: [FLINK-2017] Add predefined required parameter...

2015-09-15 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1097#discussion_r39492448
  
--- Diff: 
flink-java/src/main/java/org/apache/flink/api/java/utils/Option.java ---
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.utils;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * Internal representation of a parameter passed to a user defined 
function.
+ */
+public class Option {
+
+   private String longName;
+   private String shortName;
+
+   private T defaultValue;
+   private Set choices;
+
+   private String helpText;
+
+   public Option(String name) {
+   this.longName = name;
+   this.choices = new HashSet<>();
+   }
+
+   /**
+* Define a alternative / short name of the Parameter.
+*
+* @param shortName - short version of the parameter name
+* @return the updated Option
+*/
+   public Option alt(String shortName) {
+   this.shortName = shortName;
+   return this;
+   }
+
+   /**
+* Define a default value for the option.
+*
+* Throws an exception if the list of possible values for the parameter 
is not empty and the default value passed
+* is not in the list.
+*
+* @param defaultValue - the default value
+* @return the updated Option
+*/
+   public Option defaultValue(T defaultValue) {
+   if (this.choices.isEmpty()) {
+   return this.setDefaultValue(defaultValue);
+   } else {
+   if (this.choices.contains(defaultValue)) {
+   return this.setDefaultValue(defaultValue);
+   } else {
+   throw new 
IllegalArgumentException("defaultValue passed is not in the list of expected 
values.");
+   }
+   }
+   }
+
+   /**
+* Restrict the list of possible values of the parameter.
+*
+* @param choices - the allowed values of the parameter.
+* @return the updated Option
+*/
+   public Option choices(T... choices) {
+   Collections.addAll(this.choices, choices);
--- End diff --

Add check that `defaultValue` (if already set) is in choices.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2675) Add utilities for scheduled triggers

2015-09-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2675?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14745592#comment-14745592
 ] 

ASF GitHub Bot commented on FLINK-2675:
---

GitHub user StephanEwen opened a pull request:

https://github.com/apache/flink/pull/1133

[FLINK-2675] [streaming] Add utilities for scheduled triggers.

These utilities are used by processing time triggers to schedule 
evaluations for the future. They are the first part of reworking the streaming 
windows to make them robust and faster.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/StephanEwen/incubator-flink triggers

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/1133.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1133


commit f57d0d68a691a518482935fed9290efad6f30dbd
Author: Stephan Ewen 
Date:   2015-09-15T12:55:41Z

[FLINK-2675] [streaming] Add utilities for scheduled triggers.




> Add utilities for scheduled triggers
> 
>
> Key: FLINK-2675
> URL: https://issues.apache.org/jira/browse/FLINK-2675
> Project: Flink
>  Issue Type: Sub-task
>  Components: Streaming
>Affects Versions: 0.10
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
> Fix For: 0.10
>
>
> These utilities help schedule triggers for the future, ensure non-concurrent 
> trigger execution, and proper trigger shutdown and release.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2675] [streaming] Add utilities for sch...

2015-09-15 Thread StephanEwen
GitHub user StephanEwen opened a pull request:

https://github.com/apache/flink/pull/1133

[FLINK-2675] [streaming] Add utilities for scheduled triggers.

These utilities are used by processing time triggers to schedule 
evaluations for the future. They are the first part of reworking the streaming 
windows to make them robust and faster.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/StephanEwen/incubator-flink triggers

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/1133.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1133


commit f57d0d68a691a518482935fed9290efad6f30dbd
Author: Stephan Ewen 
Date:   2015-09-15T12:55:41Z

[FLINK-2675] [streaming] Add utilities for scheduled triggers.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2675) Add utilities for scheduled triggers

2015-09-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2675?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14745616#comment-14745616
 ] 

ASF GitHub Bot commented on FLINK-2675:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/1133#discussion_r39526103
  
--- Diff: 
flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/TriggerTimer.java
 ---
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.operators;
+
+import org.apache.flink.runtime.taskmanager.DispatcherThreadFactory;
+
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * A timer that triggers targets at a specific point in the future. This 
timer executes single-threaded,
+ * which means that never more than one trigger will be executed at the 
same time.
+ * 
+ * This timer generally maintains order of trigger events. This means that 
for two triggers scheduled at
+ * different times, the one scheduled for the later time will be executed 
after the one scheduled for the
+ * earlier time.
+ */
+public class TriggerTimer {
+   
+   /** The thread group that holds all trigger timer threads */
+   public static final ThreadGroup TRIGGER_THREADS_GROUP = new 
ThreadGroup("Triggers");
+   
+   /** The executor service that */
--- End diff --

ah, right ;-)


> Add utilities for scheduled triggers
> 
>
> Key: FLINK-2675
> URL: https://issues.apache.org/jira/browse/FLINK-2675
> Project: Flink
>  Issue Type: Sub-task
>  Components: Streaming
>Affects Versions: 0.10
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
> Fix For: 0.10
>
>
> These utilities help schedule triggers for the future, ensure non-concurrent 
> trigger execution, and proper trigger shutdown and release.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [hotfix][Table API tests]add toDataSet in tabl...

2015-09-15 Thread twalthr
Github user twalthr commented on the pull request:

https://github.com/apache/flink/pull/1131#issuecomment-140420760
  
+1 from me too.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2675] [streaming] Add utilities for sch...

2015-09-15 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/1133#discussion_r39526103
  
--- Diff: 
flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/TriggerTimer.java
 ---
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.operators;
+
+import org.apache.flink.runtime.taskmanager.DispatcherThreadFactory;
+
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * A timer that triggers targets at a specific point in the future. This 
timer executes single-threaded,
+ * which means that never more than one trigger will be executed at the 
same time.
+ * 
+ * This timer generally maintains order of trigger events. This means that 
for two triggers scheduled at
+ * different times, the one scheduled for the later time will be executed 
after the one scheduled for the
+ * earlier time.
+ */
+public class TriggerTimer {
+   
+   /** The thread group that holds all trigger timer threads */
+   public static final ThreadGroup TRIGGER_THREADS_GROUP = new 
ThreadGroup("Triggers");
+   
+   /** The executor service that */
--- End diff --

ah, right ;-)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2675] [streaming] Add utilities for sch...

2015-09-15 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/1133#discussion_r39525474
  
--- Diff: 
flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/TriggerTimer.java
 ---
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.operators;
+
+import org.apache.flink.runtime.taskmanager.DispatcherThreadFactory;
+
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * A timer that triggers targets at a specific point in the future. This 
timer executes single-threaded,
+ * which means that never more than one trigger will be executed at the 
same time.
+ * 
+ * This timer generally maintains order of trigger events. This means that 
for two triggers scheduled at
+ * different times, the one scheduled for the later time will be executed 
after the one scheduled for the
+ * earlier time.
+ */
+public class TriggerTimer {
+   
+   /** The thread group that holds all trigger timer threads */
+   public static final ThreadGroup TRIGGER_THREADS_GROUP = new 
ThreadGroup("Triggers");
+   
+   /** The executor service that */
--- End diff --

some text missing


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2675) Add utilities for scheduled triggers

2015-09-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2675?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14745609#comment-14745609
 ] 

ASF GitHub Bot commented on FLINK-2675:
---

Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/1133#discussion_r39525474
  
--- Diff: 
flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/TriggerTimer.java
 ---
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.operators;
+
+import org.apache.flink.runtime.taskmanager.DispatcherThreadFactory;
+
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * A timer that triggers targets at a specific point in the future. This 
timer executes single-threaded,
+ * which means that never more than one trigger will be executed at the 
same time.
+ * 
+ * This timer generally maintains order of trigger events. This means that 
for two triggers scheduled at
+ * different times, the one scheduled for the later time will be executed 
after the one scheduled for the
+ * earlier time.
+ */
+public class TriggerTimer {
+   
+   /** The thread group that holds all trigger timer threads */
+   public static final ThreadGroup TRIGGER_THREADS_GROUP = new 
ThreadGroup("Triggers");
+   
+   /** The executor service that */
--- End diff --

some text missing


> Add utilities for scheduled triggers
> 
>
> Key: FLINK-2675
> URL: https://issues.apache.org/jira/browse/FLINK-2675
> Project: Flink
>  Issue Type: Sub-task
>  Components: Streaming
>Affects Versions: 0.10
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
> Fix For: 0.10
>
>
> These utilities help schedule triggers for the future, ensure non-concurrent 
> trigger execution, and proper trigger shutdown and release.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2017] Add predefined required parameter...

2015-09-15 Thread fhueske
Github user fhueske commented on the pull request:

https://github.com/apache/flink/pull/1097#issuecomment-140347951
  
If you add a type field (or enum) to the option, you can check if you can 
cast the string into the requested type. Since we would only support a fixed 
set of types (the primitives I listed above) this should be quite easy.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2017) Add predefined required parameters to ParameterTool

2015-09-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2017?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14745212#comment-14745212
 ] 

ASF GitHub Bot commented on FLINK-2017:
---

Github user fhueske commented on the pull request:

https://github.com/apache/flink/pull/1097#issuecomment-140347951
  
If you add a type field (or enum) to the option, you can check if you can 
cast the string into the requested type. Since we would only support a fixed 
set of types (the primitives I listed above) this should be quite easy.


> Add predefined required parameters to ParameterTool
> ---
>
> Key: FLINK-2017
> URL: https://issues.apache.org/jira/browse/FLINK-2017
> Project: Flink
>  Issue Type: Improvement
>Affects Versions: 0.9
>Reporter: Robert Metzger
>  Labels: starter
>
> In FLINK-1525 we've added the {{ParameterTool}}.
> During the PR review, there was a request for required parameters.
> This issue is about implementing a facility to define required parameters. 
> The tool should also be able to print a help menu with a list of all 
> parameters.
> This test case shows my initial ideas how to design the API
> {code}
>   @Test
>   public void requiredParameters() {
>   RequiredParameters required = new RequiredParameters();
>   Option input = required.add("input").alt("i").help("Path to 
> input file or directory"); // parameter with long and short variant
>   required.add("output"); // parameter only with long variant
>   Option parallelism = 
> required.add("parallelism").alt("p").type(Integer.class); // parameter with 
> type
>   Option spOption = 
> required.add("sourceParallelism").alt("sp").defaultValue(12).help("Number 
> specifying the number of parallel data source instances"); // parameter with 
> default value, specifying the type.
>   Option executionType = 
> required.add("executionType").alt("et").defaultValue("pipelined").choices("pipelined",
>  "batch");
>   ParameterUtil parameter = ParameterUtil.fromArgs(new 
> String[]{"-i", "someinput", "--output", "someout", "-p", "15"});
>   required.check(parameter);
>   required.printHelp();
>   required.checkAndPopulate(parameter);
>   String inputString = input.get();
>   int par = parallelism.getInteger();
>   String output = parameter.get("output");
>   int sourcePar = parameter.getInteger(spOption.getName());
>   }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2641) Integrate the off-heap memory configuration with the TaskManager start script

2015-09-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2641?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14745227#comment-14745227
 ] 

ASF GitHub Bot commented on FLINK-2641:
---

Github user fhueske commented on the pull request:

https://github.com/apache/flink/pull/1129#issuecomment-140349703
  
Thanks for the update.
Good to merge, IMO


> Integrate the off-heap memory configuration with the TaskManager start script
> -
>
> Key: FLINK-2641
> URL: https://issues.apache.org/jira/browse/FLINK-2641
> Project: Flink
>  Issue Type: New Feature
>  Components: Start-Stop Scripts
>Affects Versions: 0.10
>Reporter: Stephan Ewen
>Assignee: Maximilian Michels
> Fix For: 0.10
>
>
> The TaskManager start script needs to adjust the {{-Xmx}}, {{-Xms}}, and 
> {{-XX:MaxDirectMemorySize}} parameters according to the off-heap memory 
> settings.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2641] integrate off-heap memory configu...

2015-09-15 Thread fhueske
Github user fhueske commented on the pull request:

https://github.com/apache/flink/pull/1129#issuecomment-140349703
  
Thanks for the update.
Good to merge, IMO


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2017) Add predefined required parameters to ParameterTool

2015-09-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2017?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14745245#comment-14745245
 ] 

ASF GitHub Bot commented on FLINK-2017:
---

Github user mliesenberg commented on a diff in the pull request:

https://github.com/apache/flink/pull/1097#discussion_r39497481
  
--- Diff: 
flink-java/src/main/java/org/apache/flink/api/java/utils/RequiredParameter.java 
---
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.utils;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * Facility to manage required parameters in user defined functions.
+ */
+public class RequiredParameter {
+
+   private static final String HELP_TEXT_PARAM_DELIMITER = "\t";
+   private static final String HELP_TEXT_LINE_DELIMITER = "\n";
+
+   private HashMap data;
+
+   public RequiredParameter() {
+   this.data = new HashMap<>();
+   }
+
+   public void add(Option option) {
+   this.data.put(option.getName(), option);
+   }
+
+   /**
+* Check if all parameters defined as required have been supplied.
+*
+* @param parameterTool - parameters supplied by user.
+*/
+   public void check(ParameterTool parameterTool) throws 
RequiredParameterException {
+   for (Option o : data.values()) {
+   // if the parameter is not present or its value is 
undefined, throw a RuntimeException.
+   if (!parameterTool.data.containsKey(o.getName()) || 
keyIsUndefined(o.getName(), parameterTool.data)) {
--- End diff --

Sounds good to me. 


> Add predefined required parameters to ParameterTool
> ---
>
> Key: FLINK-2017
> URL: https://issues.apache.org/jira/browse/FLINK-2017
> Project: Flink
>  Issue Type: Improvement
>Affects Versions: 0.9
>Reporter: Robert Metzger
>  Labels: starter
>
> In FLINK-1525 we've added the {{ParameterTool}}.
> During the PR review, there was a request for required parameters.
> This issue is about implementing a facility to define required parameters. 
> The tool should also be able to print a help menu with a list of all 
> parameters.
> This test case shows my initial ideas how to design the API
> {code}
>   @Test
>   public void requiredParameters() {
>   RequiredParameters required = new RequiredParameters();
>   Option input = required.add("input").alt("i").help("Path to 
> input file or directory"); // parameter with long and short variant
>   required.add("output"); // parameter only with long variant
>   Option parallelism = 
> required.add("parallelism").alt("p").type(Integer.class); // parameter with 
> type
>   Option spOption = 
> required.add("sourceParallelism").alt("sp").defaultValue(12).help("Number 
> specifying the number of parallel data source instances"); // parameter with 
> default value, specifying the type.
>   Option executionType = 
> required.add("executionType").alt("et").defaultValue("pipelined").choices("pipelined",
>  "batch");
>   ParameterUtil parameter = ParameterUtil.fromArgs(new 
> String[]{"-i", "someinput", "--output", "someout", "-p", "15"});
>   required.check(parameter);
>   required.printHelp();
>   required.checkAndPopulate(parameter);
>   String inputString = input.get();
>   int par = parallelism.getInteger();
>   String output = parameter.get("output");
>   int sourcePar = parameter.getInteger(spOption.getName());
>   }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2167) Add fromHCat() to TableEnvironment

2015-09-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2167?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14745260#comment-14745260
 ] 

ASF GitHub Bot commented on FLINK-2167:
---

Github user twalthr commented on the pull request:

https://github.com/apache/flink/pull/1127#issuecomment-140355813
  
Hi @chiwanpark, yes I think splitting it up makes sense. I just opened this 
PR to get some feedback and to show why my changes are necessary to integrate 
new input formats like HCatalog. You can ignore the `HCatTableSource` class as 
it is untested yet anyway.


> Add fromHCat() to TableEnvironment
> --
>
> Key: FLINK-2167
> URL: https://issues.apache.org/jira/browse/FLINK-2167
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API
>Affects Versions: 0.9
>Reporter: Fabian Hueske
>Assignee: Timo Walther
>Priority: Minor
>  Labels: starter
>
> Add a {{fromHCat()}} method to the {{TableEnvironment}} to read a {{Table}} 
> from an HCatalog table.
> The implementation could reuse Flink's HCatInputFormat.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK1919] add HCatOutputFormat

2015-09-15 Thread jamescao
Github user jamescao commented on the pull request:

https://github.com/apache/flink/pull/1079#issuecomment-140359144
  
@chiwanpark 
pr is now updated, I pull out code related to HCatInputFormat and 
incorporated Flink-2555 and Flink-2617. I also change the test environment to 
from `CollectionsEnviroment` to  `LocalEnviroment` to cover the test of the 
de-serialization for the HCatOutputFormat.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Created] (FLINK-2674) Rework windowing logic

2015-09-15 Thread Stephan Ewen (JIRA)
Stephan Ewen created FLINK-2674:
---

 Summary: Rework windowing logic
 Key: FLINK-2674
 URL: https://issues.apache.org/jira/browse/FLINK-2674
 Project: Flink
  Issue Type: Bug
  Components: Streaming
Affects Versions: 0.10
Reporter: Stephan Ewen
Priority: Critical
 Fix For: 0.10


The windowing logic needs a major overhaul. This follows the design documents: 
  - https://cwiki.apache.org/confluence/display/FLINK/Time+and+Order+in+Streams
  - https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=60624830

Specifically, the following shortcomings need to be addressed:

  - Global parallel windows should be dropped
   -> for time, local windows are aligned and serve the same purpose
   -> there is currently no known robust and efficient parallel 
implementation of custom strategies 

  - Event time and out of order arrival needs to be supported

  - Eviction of not accessed keys does not work. Non-accessed keys linger 
infinitely

  - Performance is currently bad for time windows, due to a overly general 
implementation

  - Resources are leaking, threads are not shut down

  - Elements are stored multiple times (discretizers, window buffers)

  - Finally, many implementations are buggy, produce wrong results



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-2675) Add utilities for scheduled triggers

2015-09-15 Thread Stephan Ewen (JIRA)
Stephan Ewen created FLINK-2675:
---

 Summary: Add utilities for scheduled triggers
 Key: FLINK-2675
 URL: https://issues.apache.org/jira/browse/FLINK-2675
 Project: Flink
  Issue Type: Sub-task
  Components: Streaming
Affects Versions: 0.10
Reporter: Stephan Ewen
Assignee: Stephan Ewen
 Fix For: 0.10


These utilities help schedule triggers for the future, ensure non-concurrent 
trigger execution, and proper trigger shutdown and release.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-2678) Java/Scala API does not support multi-dimensional arrays as keys

2015-09-15 Thread Till Rohrmann (JIRA)
Till Rohrmann created FLINK-2678:


 Summary: Java/Scala API does not support multi-dimensional arrays 
as keys
 Key: FLINK-2678
 URL: https://issues.apache.org/jira/browse/FLINK-2678
 Project: Flink
  Issue Type: Wish
  Components: Java API, Scala API
Reporter: Till Rohrmann
Priority: Minor


The Java/Scala API does not support grouping/sorting on field which are 
multi-dimensional arrays. It could be helpful to also support these types.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-2679) Scala API does not support Try type as key

2015-09-15 Thread Till Rohrmann (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-2679?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann updated FLINK-2679:
-
Summary: Scala API does not support Try type as key  (was: Scala API does 
not support Try type as keys)

> Scala API does not support Try type as key
> --
>
> Key: FLINK-2679
> URL: https://issues.apache.org/jira/browse/FLINK-2679
> Project: Flink
>  Issue Type: Wish
>  Components: Scala API
>Reporter: Till Rohrmann
>Priority: Minor
>
> The Scala API does not support to use the {{Try}} type as a key (used for 
> sorting, grouping e.g.). It could be helpful to add support for this type. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-2679) Scala API does not support Try type as keys

2015-09-15 Thread Till Rohrmann (JIRA)
Till Rohrmann created FLINK-2679:


 Summary: Scala API does not support Try type as keys
 Key: FLINK-2679
 URL: https://issues.apache.org/jira/browse/FLINK-2679
 Project: Flink
  Issue Type: Wish
  Components: Scala API
Reporter: Till Rohrmann
Priority: Minor


The Scala API does not support to use the {{Try}} type as a key (used for 
sorting, grouping e.g.). It could be helpful to add support for this type. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


  1   2   >