[jira] [Commented] (FLINK-2566) FlinkTopologyContext not populated completely
[ https://issues.apache.org/jira/browse/FLINK-2566?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14746405#comment-14746405 ] ASF GitHub Bot commented on FLINK-2566: --- GitHub user mjsax opened a pull request: https://github.com/apache/flink/pull/1135 [FLINK-2566] FlinkTopologyContext not populated completely - extended FlinkTopologyContext to be populted with all supportable attributes - added JUnit test - updated README.md additionally: module restructuring to get cleaner package structure You can merge this pull request into a Git repository by running: $ git pull https://github.com/mjsax/flink flink-2566-topologyContext Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/1135.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1135 commit 07473990fcbe06cff2305afcd78d5aef74895736 Author: mjsaxDate: 2015-09-15T21:59:31Z [FLINK-2566] FlinkTopologyContext not populated completely - extended FlinkTopologyContext to be populted with all supportable attributes - added JUnit test - updated README.md additionally: module restructuring to get cleaner package structure > FlinkTopologyContext not populated completely > - > > Key: FLINK-2566 > URL: https://issues.apache.org/jira/browse/FLINK-2566 > Project: Flink > Issue Type: Improvement > Components: Storm Compatibility >Reporter: Matthias J. Sax >Assignee: Matthias J. Sax >Priority: Minor > > Currently FlinkTopologyContext is not populated completely. It only contains > enough information to make WordCount example work. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-2566] FlinkTopologyContext not populate...
GitHub user mjsax opened a pull request: https://github.com/apache/flink/pull/1135 [FLINK-2566] FlinkTopologyContext not populated completely - extended FlinkTopologyContext to be populted with all supportable attributes - added JUnit test - updated README.md additionally: module restructuring to get cleaner package structure You can merge this pull request into a Git repository by running: $ git pull https://github.com/mjsax/flink flink-2566-topologyContext Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/1135.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1135 commit 07473990fcbe06cff2305afcd78d5aef74895736 Author: mjsaxDate: 2015-09-15T21:59:31Z [FLINK-2566] FlinkTopologyContext not populated completely - extended FlinkTopologyContext to be populted with all supportable attributes - added JUnit test - updated README.md additionally: module restructuring to get cleaner package structure --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: Hits
Github user mfahimazizi commented on the pull request: https://github.com/apache/flink/pull/765#issuecomment-140633835 Hi @vasia, Thank you. for this algorithm, we need to calculate edges values inside VertexUpdateFunction, so I will try to build my own delta iteration. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Updated] (FLINK-2595) Unclosed JarFile may leak resource in ClassLoaderUtilsTest
[ https://issues.apache.org/jira/browse/FLINK-2595?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ted Yu updated FLINK-2595: -- Description: Here is related code: {code} try { new JarFile(validJar.getAbsolutePath()); } catch (Exception e) { e.printStackTrace(); fail("test setup broken: cannot create a valid jar file"); } {code} When exception happens, the JarFile instance is not closed. was: Here is related code: {code} try { new JarFile(validJar.getAbsolutePath()); } catch (Exception e) { e.printStackTrace(); fail("test setup broken: cannot create a valid jar file"); } {code} When exception happens, the JarFile instance is not closed. > Unclosed JarFile may leak resource in ClassLoaderUtilsTest > -- > > Key: FLINK-2595 > URL: https://issues.apache.org/jira/browse/FLINK-2595 > Project: Flink > Issue Type: Test >Reporter: Ted Yu >Priority: Minor > > Here is related code: > {code} > try { > new JarFile(validJar.getAbsolutePath()); > } > catch (Exception e) { > e.printStackTrace(); > fail("test setup broken: cannot create a > valid jar file"); > } > {code} > When exception happens, the JarFile instance is not closed. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-2649) Potential resource leak in JarHelper#unjar()
[ https://issues.apache.org/jira/browse/FLINK-2649?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ted Yu updated FLINK-2649: -- Description: {code} dest = new BufferedOutputStream(fos, BUFFER_SIZE); while ((count = jis.read(data, 0, BUFFER_SIZE)) != -1) { dest.write(data, 0, count); } dest.flush(); dest.close(); {code} The close() of dest should be enclosed in finally block. was: {code} dest = new BufferedOutputStream(fos, BUFFER_SIZE); while ((count = jis.read(data, 0, BUFFER_SIZE)) != -1) { dest.write(data, 0, count); } dest.flush(); dest.close(); {code} The close() of dest should be enclosed in finally block. > Potential resource leak in JarHelper#unjar() > > > Key: FLINK-2649 > URL: https://issues.apache.org/jira/browse/FLINK-2649 > Project: Flink > Issue Type: Bug >Reporter: Ted Yu > > {code} > dest = new BufferedOutputStream(fos, BUFFER_SIZE); > while ((count = jis.read(data, 0, BUFFER_SIZE)) != -1) { > dest.write(data, 0, count); > } > dest.flush(); > dest.close(); > {code} > The close() of dest should be enclosed in finally block. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-2566) FlinkTopologyContext not populated completely
[ https://issues.apache.org/jira/browse/FLINK-2566?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated FLINK-2566: --- Summary: FlinkTopologyContext not populated completely (was: FlinkTopologyContext no populated completely) > FlinkTopologyContext not populated completely > - > > Key: FLINK-2566 > URL: https://issues.apache.org/jira/browse/FLINK-2566 > Project: Flink > Issue Type: Improvement > Components: Storm Compatibility >Reporter: Matthias J. Sax >Assignee: Matthias J. Sax >Priority: Minor > > Currently FlinkTopologyContext is not populated completely. It only contains > enough information to make WordCount example work. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1745) Add exact k-nearest-neighbours algorithm to machine learning library
[ https://issues.apache.org/jira/browse/FLINK-1745?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14746552#comment-14746552 ] Daniel Blazevski commented on FLINK-1745: - Thanks! Pulled and built Chinwan's branch, and further modified very minor changes to his test to familiarize myself more with DataSets. Dan On Tue, Sep 15, 2015 at 4:47 AM, Till Rohrmann (JIRA)> Add exact k-nearest-neighbours algorithm to machine learning library > > > Key: FLINK-1745 > URL: https://issues.apache.org/jira/browse/FLINK-1745 > Project: Flink > Issue Type: New Feature > Components: Machine Learning Library >Reporter: Till Rohrmann >Assignee: Daniel Blazevski > Labels: ML, Starter > > Even though the k-nearest-neighbours (kNN) [1,2] algorithm is quite trivial > it is still used as a mean to classify data and to do regression. This issue > focuses on the implementation of an exact kNN (H-BNLJ, H-BRJ) algorithm as > proposed in [2]. > Could be a starter task. > Resources: > [1] [http://en.wikipedia.org/wiki/K-nearest_neighbors_algorithm] > [2] [https://www.cs.utah.edu/~lifeifei/papers/mrknnj.pdf] -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2669) Execution contains non-serializable field
[ https://issues.apache.org/jira/browse/FLINK-2669?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14744914#comment-14744914 ] Stephan Ewen commented on FLINK-2669: - The seializability of the execution graph is tricky right now. It is properly serializable after calling {{prepareForArchiving()}}, and can be stored serialized. It is not serializable before. > Execution contains non-serializable field > - > > Key: FLINK-2669 > URL: https://issues.apache.org/jira/browse/FLINK-2669 > Project: Flink > Issue Type: Bug > Components: Distributed Runtime >Affects Versions: master, 0.9.1 >Reporter: Ufuk Celebi >Priority: Minor > > Execution is {{Serializable}}, but non-transient field {{assignedResource}} > of type {{SimpleSlot}} is not. > I've noticed this while sending a {{JobManagerMessages.RequestJob}} to a job > manager, which returns the {{ExecutionGraph}}. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2669) Execution contains non-serializable field
[ https://issues.apache.org/jira/browse/FLINK-2669?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14744915#comment-14744915 ] Stephan Ewen commented on FLINK-2669: - This is by (admittedly somewhat tricky) design... > Execution contains non-serializable field > - > > Key: FLINK-2669 > URL: https://issues.apache.org/jira/browse/FLINK-2669 > Project: Flink > Issue Type: Bug > Components: Distributed Runtime >Affects Versions: master, 0.9.1 >Reporter: Ufuk Celebi >Priority: Minor > > Execution is {{Serializable}}, but non-transient field {{assignedResource}} > of type {{SimpleSlot}} is not. > I've noticed this while sending a {{JobManagerMessages.RequestJob}} to a job > manager, which returns the {{ExecutionGraph}}. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-2670) Instable Test S3FileSystemTest
Matthias J. Sax created FLINK-2670: -- Summary: Instable Test S3FileSystemTest Key: FLINK-2670 URL: https://issues.apache.org/jira/browse/FLINK-2670 Project: Flink Issue Type: Bug Components: Tests Reporter: Matthias J. Sax Priority: Critical Fails with {noformat} == Maven produced no output for 300 seconds. == {noformat} https://travis-ci.org/apache/flink/jobs/80344487 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2111) Add "stop" signal to cleanly shutdown streaming jobs
[ https://issues.apache.org/jira/browse/FLINK-2111?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14744974#comment-14744974 ] ASF GitHub Bot commented on FLINK-2111: --- Github user mjsax commented on the pull request: https://github.com/apache/flink/pull/750#issuecomment-140301180 Hi, I cleand-up all old commits, and put a new commit on top introducing `SourceFunction.stop()` and unblock stop signal using an own thread. Please give feedback. Btw: Travis fails due to unstable test. My own Travis is green: https://travis-ci.org/mjsax/flink/builds/80344479 > Add "stop" signal to cleanly shutdown streaming jobs > > > Key: FLINK-2111 > URL: https://issues.apache.org/jira/browse/FLINK-2111 > Project: Flink > Issue Type: Improvement > Components: Distributed Runtime, JobManager, Local Runtime, > Streaming, TaskManager, Webfrontend >Reporter: Matthias J. Sax >Assignee: Matthias J. Sax >Priority: Minor > > Currently, streaming jobs can only be stopped using "cancel" command, what is > a "hard" stop with no clean shutdown. > The new introduced "stop" signal, will only affect streaming source tasks > such that the sources can stop emitting data and shutdown cleanly, resulting > in a clean shutdown of the whole streaming job. > This feature is a pre-requirment for > https://issues.apache.org/jira/browse/FLINK-1929 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...
Github user mjsax commented on the pull request: https://github.com/apache/flink/pull/750#issuecomment-140301180 Hi, I cleand-up all old commits, and put a new commit on top introducing `SourceFunction.stop()` and unblock stop signal using an own thread. Please give feedback. Btw: Travis fails due to unstable test. My own Travis is green: https://travis-ci.org/mjsax/flink/builds/80344479 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: Getter for wrapped StreamExecutionEnvironment ...
Github user lofifnc commented on the pull request: https://github.com/apache/flink/pull/1120#issuecomment-140301283 Hey, This is mostly for testing purposes. I'm working on some Tooling, which takes a `StreamExectutionEnvironment` and performs integration Tests. In the current state I'm not able to work with a `StreamExecutionEnvironment` defined with the Scala api. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2525]Add configuration support in Storm...
Github user StephanEwen commented on the pull request: https://github.com/apache/flink/pull/1046#issuecomment-140291773 As per discussion on the dev list, the `ExecuionConfig` has the `GlobalJobParameters`, which are useful if one type of config is used across all operators. If each of the operators needs its own config, can you create an abstract base class for the storm functions which takes a configuration as an argumen? BTW: There is no plan to remove the `withParameters()` method in the batch API. It is just not the encouraged mechanism any more... --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-2525) Add configuration support in Storm-compatibility
[ https://issues.apache.org/jira/browse/FLINK-2525?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14744911#comment-14744911 ] ASF GitHub Bot commented on FLINK-2525: --- Github user StephanEwen commented on the pull request: https://github.com/apache/flink/pull/1046#issuecomment-140291773 As per discussion on the dev list, the `ExecuionConfig` has the `GlobalJobParameters`, which are useful if one type of config is used across all operators. If each of the operators needs its own config, can you create an abstract base class for the storm functions which takes a configuration as an argumen? BTW: There is no plan to remove the `withParameters()` method in the batch API. It is just not the encouraged mechanism any more... > Add configuration support in Storm-compatibility > > > Key: FLINK-2525 > URL: https://issues.apache.org/jira/browse/FLINK-2525 > Project: Flink > Issue Type: New Feature > Components: Storm Compatibility >Reporter: fangfengbin >Assignee: fangfengbin > > Spouts and Bolt are initialized by a call to `Spout.open(...)` and > `Bolt.prepare()`, respectively. Both methods have a config `Map` as first > parameter. This map is currently not populated. Thus, Spouts and Bolts cannot > be configure with user defined parameters. In order to support this feature, > spout and bolt wrapper classes need to be extended to create a proper `Map` > object. Furthermore, the clients need to be extended to take a `Map`, > translate it into a Flink `Configuration` that is forwarded to the wrappers > for proper initialization of the map. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [hotfix][Table API tests]add toDataSet in tabl...
GitHub user HuangWHWHW opened a pull request: https://github.com/apache/flink/pull/1131 [hotfix][Table API tests]add toDataSet in table API tests See the discussion in the PR:https://github.com/apache/flink/pull/1098 for detail. And I'm not sure this change need a JIRA or just take a hotfix. The problem is due to the ambiguous Table APIs between DataSet.scala and DataStream.scala. The class Table can call methods in both DataSet.scala and DataStream.scala. The class Table need to specify `toDataSet` or `toDataStream` firstly when there are the same methods in both DataSet.scala and DataStream.scala. So I added the `toDataSet`(since there is only one method in DataSet.scala recently but further will be another in DataStream.scala) in these Table API tests for the follow-up working in DataStream.scala. You can merge this pull request into a Git repository by running: $ git pull https://github.com/HuangWHWHW/flink FLINK-2622-toDataSet Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/1131.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1131 commit 62741ca7cfe6620d94717fe9eb6909e34da763d0 Author: HuangWHWHW <404823...@qq.com> Date: 2015-09-15T06:28:17Z [hotfix][Table API tests]add toDataSet in table API tests --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-2525) Add configuration support in Storm-compatibility
[ https://issues.apache.org/jira/browse/FLINK-2525?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14744978#comment-14744978 ] ASF GitHub Bot commented on FLINK-2525: --- Github user mjsax commented on the pull request: https://github.com/apache/flink/pull/1046#issuecomment-140301378 Storm only supports one global configuration that is shared over all spout/bolts. So `GlobalJobParameter` will work just fine. > Add configuration support in Storm-compatibility > > > Key: FLINK-2525 > URL: https://issues.apache.org/jira/browse/FLINK-2525 > Project: Flink > Issue Type: New Feature > Components: Storm Compatibility >Reporter: fangfengbin >Assignee: fangfengbin > > Spouts and Bolt are initialized by a call to `Spout.open(...)` and > `Bolt.prepare()`, respectively. Both methods have a config `Map` as first > parameter. This map is currently not populated. Thus, Spouts and Bolts cannot > be configure with user defined parameters. In order to support this feature, > spout and bolt wrapper classes need to be extended to create a proper `Map` > object. Furthermore, the clients need to be extended to take a `Map`, > translate it into a Flink `Configuration` that is forwarded to the wrappers > for proper initialization of the map. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-2525]Add configuration support in Storm...
Github user mjsax commented on the pull request: https://github.com/apache/flink/pull/1046#issuecomment-140301378 Storm only supports one global configuration that is shared over all spout/bolts. So `GlobalJobParameter` will work just fine. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Closed] (FLINK-2669) Execution contains non-serializable field
[ https://issues.apache.org/jira/browse/FLINK-2669?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ufuk Celebi closed FLINK-2669. -- Resolution: Won't Fix Yeah, I agree and I didn't mean to "fix" this (that's also why I've labelled it as minor). I saw that request job is just used for the memory archivist. And it was actually not what I needed. I know that there is nothing we can do about it at this point. > Execution contains non-serializable field > - > > Key: FLINK-2669 > URL: https://issues.apache.org/jira/browse/FLINK-2669 > Project: Flink > Issue Type: Bug > Components: Distributed Runtime >Affects Versions: master, 0.9.1 >Reporter: Ufuk Celebi >Priority: Minor > > Execution is {{Serializable}}, but non-transient field {{assignedResource}} > of type {{SimpleSlot}} is not. > I've noticed this while sending a {{JobManagerMessages.RequestJob}} to a job > manager, which returns the {{ExecutionGraph}}. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-2671) Instable Test StreamCheckpointNotifierITCase
Matthias J. Sax created FLINK-2671: -- Summary: Instable Test StreamCheckpointNotifierITCase Key: FLINK-2671 URL: https://issues.apache.org/jira/browse/FLINK-2671 Project: Flink Issue Type: Bug Components: Tests Reporter: Matthias J. Sax Priority: Critical {noformat} Failed tests: StreamCheckpointNotifierITCase>StreamFaultToleranceTestBase.runCheckpointedProgram:105->postSubmit:115 No checkpoint notification was received.{noformat} https://travis-ci.org/apache/flink/jobs/80344489 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: Hits
Github user vasia commented on the pull request: https://github.com/apache/flink/pull/765#issuecomment-140314739 Hi @mfahimazizi, as I said, it is not possible to access the edge values inside the `VertexUpdateFunction`. However, you can get the same functionality if you add the edge value inside the message that you create in the `MessagingFunction`. Alternatively, you can build your own delta iteration, instead of using the vertex-centric model. If you're planning to finish this PR, then let us know and we can even sync on skype or so to help you out! Otherwise, please close this PR and hopefully someone else will pick up this issue. Thank you! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-2637) Add abstract equals, hashCode and toString methods to TypeInformation
[ https://issues.apache.org/jira/browse/FLINK-2637?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14745761#comment-14745761 ] ASF GitHub Bot commented on FLINK-2637: --- GitHub user tillrohrmann opened a pull request: https://github.com/apache/flink/pull/1134 [FLINK-2637] [api-breaking] [scala, types] Adds equals and hashCode method to TypeInformations and TypeSerializers Adds abstract `equals`, `hashCode`, `canEqual` and `toString` methods to `TypeInformation`. Adds missing implementations to subtypes. The `canEqual(Object obj)` method returns true iff the `obj` can be equaled with the called instance. Adds abstract `equals`, `hashCode` and `canEqual` methods to `TypeSerializer`. Makes `CompositeType` subtypes serializable by removing non-serializable fields which were only used for the comparator construction. The comparator construction is now realized within a builder object which keeps the intermediate state. Consequently, the PR #943 is now obsolete and can be closed. Refactors the `ObjectArrayTypeInfo` so that the type extraction logic now happens within the `TypeExtractor` and no longer in the `TypeInformation` subtype. You can merge this pull request into a Git repository by running: $ git pull https://github.com/tillrohrmann/flink fixOptionTypeInfo Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/1134.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1134 commit 35a18b3f5148ec3fdddc318ea4d5971427fffda3 Author: Till RohrmannDate: 2015-09-07T23:12:09Z [FLINK-2637] [api-breaking] [scala, types] Adds equals and hashCode method to TypeInformations and TypeSerializers. Fixes ObjectArrayTypeInfo. Makes CompositeTypes serializable. Adds test for equality relation's symmetric property > Add abstract equals, hashCode and toString methods to TypeInformation > - > > Key: FLINK-2637 > URL: https://issues.apache.org/jira/browse/FLINK-2637 > Project: Flink > Issue Type: Improvement > Components: Core >Affects Versions: 0.9, 0.10 >Reporter: Fabian Hueske >Assignee: Till Rohrmann > Labels: starter > Fix For: 0.10 > > > Flink expects that implementations of {{TypeInformation}} have valid > implementations of {{hashCode}} and {{equals}}. However, the API does not > enforce to implement these methods. Hence, this is a common origin for bugs > such as for example FLINK-2633. > This can be avoided by adding abstract {{hashCode}} and {{equals}} methods to > TypeInformation. An abstract {{toString}} method could also be added. > This change will brake the API and require to fix a couple of broken > {{TypeInformation}} implementations. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2410) PojoTypeInfo is not completely serializable
[ https://issues.apache.org/jira/browse/FLINK-2410?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14745762#comment-14745762 ] ASF GitHub Bot commented on FLINK-2410: --- Github user tillrohrmann commented on the pull request: https://github.com/apache/flink/pull/943#issuecomment-140468581 The non-serializable problem of the `PojoTypeInfo` has been fixed in #1134. Thus, the PR can be closed. > PojoTypeInfo is not completely serializable > --- > > Key: FLINK-2410 > URL: https://issues.apache.org/jira/browse/FLINK-2410 > Project: Flink > Issue Type: Bug > Components: Java API >Reporter: Timo Walther >Assignee: Timo Walther > > Table API requires PojoTypeInfo to be serializable. The following code fails: > {code} > Table finishedEtlTable = maxMeasurements > .join(stationTable).where("s_station_id = m_station_id") > .select("year, month, day, value, country, name"); > DataSet maxTemp = tableEnv.toDataSet(finishedEtlTable, > MaxTemperature.class); > maxTemp > .groupBy("year") > .sortGroup("value", Order.DESCENDING) > .first(1) > .print(); > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1745) Add exact k-nearest-neighbours algorithm to machine learning library
[ https://issues.apache.org/jira/browse/FLINK-1745?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14745055#comment-14745055 ] Chiwan Park commented on FLINK-1745: [~till.rohrmann] Could you assign this issue to [~danielblazevski]? I can't find him in assignee list. > Add exact k-nearest-neighbours algorithm to machine learning library > > > Key: FLINK-1745 > URL: https://issues.apache.org/jira/browse/FLINK-1745 > Project: Flink > Issue Type: New Feature > Components: Machine Learning Library >Reporter: Till Rohrmann > Labels: ML, Starter > > Even though the k-nearest-neighbours (kNN) [1,2] algorithm is quite trivial > it is still used as a mean to classify data and to do regression. This issue > focuses on the implementation of an exact kNN (H-BNLJ, H-BRJ) algorithm as > proposed in [2]. > Could be a starter task. > Resources: > [1] [http://en.wikipedia.org/wiki/K-nearest_neighbors_algorithm] > [2] [https://www.cs.utah.edu/~lifeifei/papers/mrknnj.pdf] -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-2672) Add partitioned output format to HDFS RollingSink
Mohamed Amine ABDESSEMED created FLINK-2672: --- Summary: Add partitioned output format to HDFS RollingSink Key: FLINK-2672 URL: https://issues.apache.org/jira/browse/FLINK-2672 Project: Flink Issue Type: Improvement Components: Streaming Connectors Affects Versions: 0.10 Reporter: Mohamed Amine ABDESSEMED Priority: Minor An interesting use case of the HDFS Sink is to dispatch data into multiple directories depending of attributes present in the source data. For example, for some data with a timestamp and a status fields, we want to write it into different directories using a pattern like : /somepath/%{timestamp}/%{status} The expected results are somethings like: /somepath/some_timestamp/wellformed /somepath/some_timestamp/malformed /somepath/some_timestamp/incomplete ... etc To support this functionality the bucketing and checkpointing logics need to be changed. Note: For now, this can be done using the current version of the Rolling HDFS Sink with the help of splitting data streams and having multiple HDFS sinks. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2167) Add fromHCat() to TableEnvironment
[ https://issues.apache.org/jira/browse/FLINK-2167?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14745078#comment-14745078 ] ASF GitHub Bot commented on FLINK-2167: --- Github user chiwanpark commented on the pull request: https://github.com/apache/flink/pull/1127#issuecomment-140327645 Hi @twalthr, thanks for your contribution. But this PR contains many changes unrelated to HCatalog format. Maybe we should split this PR into HCatalog and other changes. > Add fromHCat() to TableEnvironment > -- > > Key: FLINK-2167 > URL: https://issues.apache.org/jira/browse/FLINK-2167 > Project: Flink > Issue Type: New Feature > Components: Table API >Affects Versions: 0.9 >Reporter: Fabian Hueske >Assignee: Timo Walther >Priority: Minor > Labels: starter > > Add a {{fromHCat()}} method to the {{TableEnvironment}} to read a {{Table}} > from an HCatalog table. > The implementation could reuse Flink's HCatInputFormat. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-2167] [table] Add fromHCat() to TableEn...
Github user chiwanpark commented on the pull request: https://github.com/apache/flink/pull/1127#issuecomment-140327645 Hi @twalthr, thanks for your contribution. But this PR contains many changes unrelated to HCatalog format. Maybe we should split this PR into HCatalog and other changes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-2641) Integrate the off-heap memory configuration with the TaskManager start script
[ https://issues.apache.org/jira/browse/FLINK-2641?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14745094#comment-14745094 ] ASF GitHub Bot commented on FLINK-2641: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1129#discussion_r39488711 --- Diff: flink-dist/src/main/flink-bin/bin/taskmanager.sh --- @@ -51,13 +51,41 @@ if [[ $STARTSTOP == "start" ]]; then fi fi -if [[ ! ${FLINK_TM_HEAP} =~ ${IS_NUMBER} ]]; then -echo "[ERROR] Configured TaskManager JVM heap size is not a number. Please set '$KEY_TASKM_HEAP_MB' in $FLINK_CONF_FILE." +if [[ ! ${FLINK_TM_HEAP} =~ ${IS_NUMBER} ]] || [[ "${FLINK_TM_HEAP}" -lt "0" ]]; then +echo "[ERROR] Configured TaskManager JVM heap size is not a number. Please set '${KEY_TASKM_MEM_SIZE}' in ${FLINK_CONF_FILE}." exit 1 fi -if [ "$FLINK_TM_HEAP" -gt 0 ]; then -export JVM_ARGS="$JVM_ARGS -Xms"$FLINK_TM_HEAP"m -Xmx"$FLINK_TM_HEAP"m" +if [ "${FLINK_TM_HEAP}" -gt "0" ]; then + +TM_HEAP_SIZE=${FLINK_TM_HEAP} +TM_OFFHEAP_SIZE=0 +# some space for Netty initilization +NETTY_BUFFERS=$((1024 * 1024)) + +if useOffHeapMemory; then +if [[ "${FLINK_TM_MEM_MANAGED_SIZE}" -gt "0" ]]; then +# We split up the total memory in heap and off-heap memory +if [[ "${FLINK_TM_HEAP}" -le "${FLINK_TM_MEM_MANAGED_SIZE}" ]]; then +echo "[ERROR] Configured TaskManager memory size ('${KEY_TASKM_MEM_SIZE}') must be larger than the managed memory size ('${KEY_TASKM_MEM_MANAGED_SIZE}')." +exit 1 +fi +TM_OFFHEAP_SIZE=${FLINK_TM_MEM_MANAGED_SIZE} +TM_HEAP_SIZE=$((FLINK_TM_HEAP - FLINK_TM_MEM_MANAGED_SIZE)) +else +# We calculate the memory using a fraction of the total memory +if [[ `bc -l <<< "${FLINK_TM_MEM_MANAGED_FRACTION} >= 1.0"` != "0" ]]; then +echo "[ERROR] Configured TaskManager managed memory fraction is not a valid value. Please set '${KEY_TASKM_MEM_MANAGED_FRACTION}' in ${FLINK_CONF_FILE}" --- End diff -- Add validity bounds 0 > x > 1 to message? > Integrate the off-heap memory configuration with the TaskManager start script > - > > Key: FLINK-2641 > URL: https://issues.apache.org/jira/browse/FLINK-2641 > Project: Flink > Issue Type: New Feature > Components: Start-Stop Scripts >Affects Versions: 0.10 >Reporter: Stephan Ewen >Assignee: Maximilian Michels > Fix For: 0.10 > > > The TaskManager start script needs to adjust the {{-Xmx}}, {{-Xms}}, and > {{-XX:MaxDirectMemorySize}} parameters according to the off-heap memory > settings. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-2641] integrate off-heap memory configu...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1129#discussion_r39488711 --- Diff: flink-dist/src/main/flink-bin/bin/taskmanager.sh --- @@ -51,13 +51,41 @@ if [[ $STARTSTOP == "start" ]]; then fi fi -if [[ ! ${FLINK_TM_HEAP} =~ ${IS_NUMBER} ]]; then -echo "[ERROR] Configured TaskManager JVM heap size is not a number. Please set '$KEY_TASKM_HEAP_MB' in $FLINK_CONF_FILE." +if [[ ! ${FLINK_TM_HEAP} =~ ${IS_NUMBER} ]] || [[ "${FLINK_TM_HEAP}" -lt "0" ]]; then +echo "[ERROR] Configured TaskManager JVM heap size is not a number. Please set '${KEY_TASKM_MEM_SIZE}' in ${FLINK_CONF_FILE}." exit 1 fi -if [ "$FLINK_TM_HEAP" -gt 0 ]; then -export JVM_ARGS="$JVM_ARGS -Xms"$FLINK_TM_HEAP"m -Xmx"$FLINK_TM_HEAP"m" +if [ "${FLINK_TM_HEAP}" -gt "0" ]; then + +TM_HEAP_SIZE=${FLINK_TM_HEAP} +TM_OFFHEAP_SIZE=0 +# some space for Netty initilization +NETTY_BUFFERS=$((1024 * 1024)) + +if useOffHeapMemory; then +if [[ "${FLINK_TM_MEM_MANAGED_SIZE}" -gt "0" ]]; then +# We split up the total memory in heap and off-heap memory +if [[ "${FLINK_TM_HEAP}" -le "${FLINK_TM_MEM_MANAGED_SIZE}" ]]; then +echo "[ERROR] Configured TaskManager memory size ('${KEY_TASKM_MEM_SIZE}') must be larger than the managed memory size ('${KEY_TASKM_MEM_MANAGED_SIZE}')." +exit 1 +fi +TM_OFFHEAP_SIZE=${FLINK_TM_MEM_MANAGED_SIZE} +TM_HEAP_SIZE=$((FLINK_TM_HEAP - FLINK_TM_MEM_MANAGED_SIZE)) +else +# We calculate the memory using a fraction of the total memory +if [[ `bc -l <<< "${FLINK_TM_MEM_MANAGED_FRACTION} >= 1.0"` != "0" ]]; then +echo "[ERROR] Configured TaskManager managed memory fraction is not a valid value. Please set '${KEY_TASKM_MEM_MANAGED_FRACTION}' in ${FLINK_CONF_FILE}" --- End diff -- Add validity bounds 0 > x > 1 to message? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2641] integrate off-heap memory configu...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1129#discussion_r39490684 --- Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala --- @@ -1586,32 +1586,29 @@ object TaskManager { ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY, "MemoryManager fraction of the free memory must be between 0.0 and 1.0") - val relativeMemSize = (EnvironmentInformation.getSizeOfFreeHeapMemoryWithDefrag() * -fraction).toLong + if (memType == MemoryType.HEAP) { - LOG.info(s"Using $fraction of the currently free heap space for Flink managed " + -s" heap memory (${relativeMemSize >> 20} MB).") +val relativeMemSize = (EnvironmentInformation.getSizeOfFreeHeapMemoryWithDefrag() * + fraction).toLong - relativeMemSize -} -else { - val ratio = configuration.getFloat( -ConfigConstants.TASK_MANAGER_MEMORY_OFF_HEAP_RATIO_KEY, -ConfigConstants.DEFAULT_MEMORY_MANAGER_MEMORY_OFF_HEAP_RATIO) - - checkConfigParameter(ratio > 0.0f, -ConfigConstants.TASK_MANAGER_MEMORY_OFF_HEAP_RATIO_KEY, -"MemoryManager ratio (off-heap memory / heap size) must be larger than zero") - - val maxHeapSize = EnvironmentInformation.getMaxJvmHeapMemory() - val relativeMemSize = (maxHeapSize * ratio).toLong +LOG.info(s"Using $fraction of the currently free heap space for Flink managed " + + s" heap memory (${relativeMemSize >> 20} MB).") + +relativeMemSize + } else if (memType == MemoryType.OFF_HEAP) { - LOG.info(s"Using $ratio time the heap size (${maxHeapSize} bytes) for Flink " + -s"managed off-heap memory (${relativeMemSize >> 20} MB).") +// The maximum heap memory has been adjusted according to the fraction +val directMemorySize = (EnvironmentInformation.getMaxJvmHeapMemory() / fraction).toLong --- End diff -- Shouldn't this be (HEAPSIZE / (1.0 - fraction)) - HEAPSIZE? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-2641) Integrate the off-heap memory configuration with the TaskManager start script
[ https://issues.apache.org/jira/browse/FLINK-2641?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14745131#comment-14745131 ] ASF GitHub Bot commented on FLINK-2641: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1129#discussion_r39490684 --- Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala --- @@ -1586,32 +1586,29 @@ object TaskManager { ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY, "MemoryManager fraction of the free memory must be between 0.0 and 1.0") - val relativeMemSize = (EnvironmentInformation.getSizeOfFreeHeapMemoryWithDefrag() * -fraction).toLong + if (memType == MemoryType.HEAP) { - LOG.info(s"Using $fraction of the currently free heap space for Flink managed " + -s" heap memory (${relativeMemSize >> 20} MB).") +val relativeMemSize = (EnvironmentInformation.getSizeOfFreeHeapMemoryWithDefrag() * + fraction).toLong - relativeMemSize -} -else { - val ratio = configuration.getFloat( -ConfigConstants.TASK_MANAGER_MEMORY_OFF_HEAP_RATIO_KEY, -ConfigConstants.DEFAULT_MEMORY_MANAGER_MEMORY_OFF_HEAP_RATIO) - - checkConfigParameter(ratio > 0.0f, -ConfigConstants.TASK_MANAGER_MEMORY_OFF_HEAP_RATIO_KEY, -"MemoryManager ratio (off-heap memory / heap size) must be larger than zero") - - val maxHeapSize = EnvironmentInformation.getMaxJvmHeapMemory() - val relativeMemSize = (maxHeapSize * ratio).toLong +LOG.info(s"Using $fraction of the currently free heap space for Flink managed " + + s" heap memory (${relativeMemSize >> 20} MB).") + +relativeMemSize + } else if (memType == MemoryType.OFF_HEAP) { - LOG.info(s"Using $ratio time the heap size (${maxHeapSize} bytes) for Flink " + -s"managed off-heap memory (${relativeMemSize >> 20} MB).") +// The maximum heap memory has been adjusted according to the fraction +val directMemorySize = (EnvironmentInformation.getMaxJvmHeapMemory() / fraction).toLong --- End diff -- Shouldn't this be (HEAPSIZE / (1.0 - fraction)) - HEAPSIZE? > Integrate the off-heap memory configuration with the TaskManager start script > - > > Key: FLINK-2641 > URL: https://issues.apache.org/jira/browse/FLINK-2641 > Project: Flink > Issue Type: New Feature > Components: Start-Stop Scripts >Affects Versions: 0.10 >Reporter: Stephan Ewen >Assignee: Maximilian Michels > Fix For: 0.10 > > > The TaskManager start script needs to adjust the {{-Xmx}}, {{-Xms}}, and > {{-XX:MaxDirectMemorySize}} parameters according to the off-heap memory > settings. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-2017] Add predefined required parameter...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1097#discussion_r39493200 --- Diff: flink-java/src/main/java/org/apache/flink/api/java/utils/RequiredParameter.java --- @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.utils; + +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; + +/** + * Facility to manage required parameters in user defined functions. + */ +public class RequiredParameter { + + private static final String HELP_TEXT_PARAM_DELIMITER = "\t"; + private static final String HELP_TEXT_LINE_DELIMITER = "\n"; + + private HashMapdata; + + public RequiredParameter() { + this.data = new HashMap<>(); + } + + public void add(Option option) { + this.data.put(option.getName(), option); + } + + /** +* Check if all parameters defined as required have been supplied. +* +* @param parameterTool - parameters supplied by user. +*/ + public void check(ParameterTool parameterTool) throws RequiredParameterException { + for (Option o : data.values()) { + // if the parameter is not present or its value is undefined, throw a RuntimeException. + if (!parameterTool.data.containsKey(o.getName()) || keyIsUndefined(o.getName(), parameterTool.data)) { --- End diff -- Check alternative name as well? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-2017) Add predefined required parameters to ParameterTool
[ https://issues.apache.org/jira/browse/FLINK-2017?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14745170#comment-14745170 ] ASF GitHub Bot commented on FLINK-2017: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1097#discussion_r39493200 --- Diff: flink-java/src/main/java/org/apache/flink/api/java/utils/RequiredParameter.java --- @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.utils; + +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; + +/** + * Facility to manage required parameters in user defined functions. + */ +public class RequiredParameter { + + private static final String HELP_TEXT_PARAM_DELIMITER = "\t"; + private static final String HELP_TEXT_LINE_DELIMITER = "\n"; + + private HashMapdata; + + public RequiredParameter() { + this.data = new HashMap<>(); + } + + public void add(Option option) { + this.data.put(option.getName(), option); + } + + /** +* Check if all parameters defined as required have been supplied. +* +* @param parameterTool - parameters supplied by user. +*/ + public void check(ParameterTool parameterTool) throws RequiredParameterException { + for (Option o : data.values()) { + // if the parameter is not present or its value is undefined, throw a RuntimeException. + if (!parameterTool.data.containsKey(o.getName()) || keyIsUndefined(o.getName(), parameterTool.data)) { --- End diff -- Check alternative name as well? > Add predefined required parameters to ParameterTool > --- > > Key: FLINK-2017 > URL: https://issues.apache.org/jira/browse/FLINK-2017 > Project: Flink > Issue Type: Improvement >Affects Versions: 0.9 >Reporter: Robert Metzger > Labels: starter > > In FLINK-1525 we've added the {{ParameterTool}}. > During the PR review, there was a request for required parameters. > This issue is about implementing a facility to define required parameters. > The tool should also be able to print a help menu with a list of all > parameters. > This test case shows my initial ideas how to design the API > {code} > @Test > public void requiredParameters() { > RequiredParameters required = new RequiredParameters(); > Option input = required.add("input").alt("i").help("Path to > input file or directory"); // parameter with long and short variant > required.add("output"); // parameter only with long variant > Option parallelism = > required.add("parallelism").alt("p").type(Integer.class); // parameter with > type > Option spOption = > required.add("sourceParallelism").alt("sp").defaultValue(12).help("Number > specifying the number of parallel data source instances"); // parameter with > default value, specifying the type. > Option executionType = > required.add("executionType").alt("et").defaultValue("pipelined").choices("pipelined", > "batch"); > ParameterUtil parameter = ParameterUtil.fromArgs(new > String[]{"-i", "someinput", "--output", "someout", "-p", "15"}); > required.check(parameter); > required.printHelp(); > required.checkAndPopulate(parameter); > String inputString = input.get(); > int par = parallelism.getInteger(); > String output = parameter.get("output"); > int sourcePar = parameter.getInteger(spOption.getName()); > } > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-2641] integrate off-heap memory configu...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/1129#discussion_r39493894 --- Diff: flink-dist/src/main/flink-bin/bin/taskmanager.sh --- @@ -51,13 +51,41 @@ if [[ $STARTSTOP == "start" ]]; then fi fi -if [[ ! ${FLINK_TM_HEAP} =~ ${IS_NUMBER} ]]; then -echo "[ERROR] Configured TaskManager JVM heap size is not a number. Please set '$KEY_TASKM_HEAP_MB' in $FLINK_CONF_FILE." +if [[ ! ${FLINK_TM_HEAP} =~ ${IS_NUMBER} ]] || [[ "${FLINK_TM_HEAP}" -lt "0" ]]; then +echo "[ERROR] Configured TaskManager JVM heap size is not a number. Please set '${KEY_TASKM_MEM_SIZE}' in ${FLINK_CONF_FILE}." exit 1 fi -if [ "$FLINK_TM_HEAP" -gt 0 ]; then -export JVM_ARGS="$JVM_ARGS -Xms"$FLINK_TM_HEAP"m -Xmx"$FLINK_TM_HEAP"m" +if [ "${FLINK_TM_HEAP}" -gt "0" ]; then + +TM_HEAP_SIZE=${FLINK_TM_HEAP} +TM_OFFHEAP_SIZE=0 +# some space for Netty initilization +NETTY_BUFFERS=$((1024 * 1024)) + +if useOffHeapMemory; then +if [[ "${FLINK_TM_MEM_MANAGED_SIZE}" -gt "0" ]]; then +# We split up the total memory in heap and off-heap memory +if [[ "${FLINK_TM_HEAP}" -le "${FLINK_TM_MEM_MANAGED_SIZE}" ]]; then +echo "[ERROR] Configured TaskManager memory size ('${KEY_TASKM_MEM_SIZE}') must be larger than the managed memory size ('${KEY_TASKM_MEM_MANAGED_SIZE}')." +exit 1 +fi +TM_OFFHEAP_SIZE=${FLINK_TM_MEM_MANAGED_SIZE} +TM_HEAP_SIZE=$((FLINK_TM_HEAP - FLINK_TM_MEM_MANAGED_SIZE)) +else +# We calculate the memory using a fraction of the total memory +if [[ `bc -l <<< "${FLINK_TM_MEM_MANAGED_FRACTION} >= 1.0"` != "0" ]]; then +echo "[ERROR] Configured TaskManager managed memory fraction is not a valid value. Please set '${KEY_TASKM_MEM_MANAGED_FRACTION}' in ${FLINK_CONF_FILE}" --- End diff -- Thanks, added a lower bound. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2641] integrate off-heap memory configu...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/1129#discussion_r39493902 --- Diff: flink-dist/src/main/flink-bin/bin/taskmanager.sh --- @@ -51,13 +51,41 @@ if [[ $STARTSTOP == "start" ]]; then fi fi -if [[ ! ${FLINK_TM_HEAP} =~ ${IS_NUMBER} ]]; then -echo "[ERROR] Configured TaskManager JVM heap size is not a number. Please set '$KEY_TASKM_HEAP_MB' in $FLINK_CONF_FILE." +if [[ ! ${FLINK_TM_HEAP} =~ ${IS_NUMBER} ]] || [[ "${FLINK_TM_HEAP}" -lt "0" ]]; then +echo "[ERROR] Configured TaskManager JVM heap size is not a number. Please set '${KEY_TASKM_MEM_SIZE}' in ${FLINK_CONF_FILE}." exit 1 fi -if [ "$FLINK_TM_HEAP" -gt 0 ]; then -export JVM_ARGS="$JVM_ARGS -Xms"$FLINK_TM_HEAP"m -Xmx"$FLINK_TM_HEAP"m" +if [ "${FLINK_TM_HEAP}" -gt "0" ]; then + +TM_HEAP_SIZE=${FLINK_TM_HEAP} +TM_OFFHEAP_SIZE=0 +# some space for Netty initilization --- End diff -- Thanks --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-1745) Add exact k-nearest-neighbours algorithm to machine learning library
[ https://issues.apache.org/jira/browse/FLINK-1745?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14745063#comment-14745063 ] Till Rohrmann commented on FLINK-1745: -- Done. > Add exact k-nearest-neighbours algorithm to machine learning library > > > Key: FLINK-1745 > URL: https://issues.apache.org/jira/browse/FLINK-1745 > Project: Flink > Issue Type: New Feature > Components: Machine Learning Library >Reporter: Till Rohrmann >Assignee: Daniel Blazevski > Labels: ML, Starter > > Even though the k-nearest-neighbours (kNN) [1,2] algorithm is quite trivial > it is still used as a mean to classify data and to do regression. This issue > focuses on the implementation of an exact kNN (H-BNLJ, H-BRJ) algorithm as > proposed in [2]. > Could be a starter task. > Resources: > [1] [http://en.wikipedia.org/wiki/K-nearest_neighbors_algorithm] > [2] [https://www.cs.utah.edu/~lifeifei/papers/mrknnj.pdf] -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-1745) Add exact k-nearest-neighbours algorithm to machine learning library
[ https://issues.apache.org/jira/browse/FLINK-1745?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann updated FLINK-1745: - Assignee: Daniel Blazevski > Add exact k-nearest-neighbours algorithm to machine learning library > > > Key: FLINK-1745 > URL: https://issues.apache.org/jira/browse/FLINK-1745 > Project: Flink > Issue Type: New Feature > Components: Machine Learning Library >Reporter: Till Rohrmann >Assignee: Daniel Blazevski > Labels: ML, Starter > > Even though the k-nearest-neighbours (kNN) [1,2] algorithm is quite trivial > it is still used as a mean to classify data and to do regression. This issue > focuses on the implementation of an exact kNN (H-BNLJ, H-BRJ) algorithm as > proposed in [2]. > Could be a starter task. > Resources: > [1] [http://en.wikipedia.org/wiki/K-nearest_neighbors_algorithm] > [2] [https://www.cs.utah.edu/~lifeifei/papers/mrknnj.pdf] -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2583) Add Stream Sink For Rolling HDFS Files
[ https://issues.apache.org/jira/browse/FLINK-2583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14745062#comment-14745062 ] ASF GitHub Bot commented on FLINK-2583: --- Github user aminouvic commented on the pull request: https://github.com/apache/flink/pull/1084#issuecomment-140325179 Yeah you're right better have an operational version of the sink first, followup JIRA created https://issues.apache.org/jira/browse/FLINK-2672 > Add Stream Sink For Rolling HDFS Files > -- > > Key: FLINK-2583 > URL: https://issues.apache.org/jira/browse/FLINK-2583 > Project: Flink > Issue Type: New Feature > Components: Streaming >Reporter: Aljoscha Krettek >Assignee: Aljoscha Krettek > Fix For: 0.10 > > > In addition to having configurable file-rolling behavior the Sink should also > integrate with checkpointing to make it possible to have exactly-once > semantics throughout the topology. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-2583] Add Stream Sink For Rolling HDFS ...
Github user aminouvic commented on the pull request: https://github.com/apache/flink/pull/1084#issuecomment-140325179 Yeah you're right better have an operational version of the sink first, followup JIRA created https://issues.apache.org/jira/browse/FLINK-2672 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2665] [api-breaking] [runtime] Makes Ex...
Github user fhueske commented on the pull request: https://github.com/apache/flink/pull/1128#issuecomment-140325565 LGTM, will merge this --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2641] integrate off-heap memory configu...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1129#discussion_r39488877 --- Diff: flink-dist/src/main/flink-bin/bin/taskmanager.sh --- @@ -51,13 +51,41 @@ if [[ $STARTSTOP == "start" ]]; then fi fi -if [[ ! ${FLINK_TM_HEAP} =~ ${IS_NUMBER} ]]; then -echo "[ERROR] Configured TaskManager JVM heap size is not a number. Please set '$KEY_TASKM_HEAP_MB' in $FLINK_CONF_FILE." +if [[ ! ${FLINK_TM_HEAP} =~ ${IS_NUMBER} ]] || [[ "${FLINK_TM_HEAP}" -lt "0" ]]; then +echo "[ERROR] Configured TaskManager JVM heap size is not a number. Please set '${KEY_TASKM_MEM_SIZE}' in ${FLINK_CONF_FILE}." exit 1 fi -if [ "$FLINK_TM_HEAP" -gt 0 ]; then -export JVM_ARGS="$JVM_ARGS -Xms"$FLINK_TM_HEAP"m -Xmx"$FLINK_TM_HEAP"m" +if [ "${FLINK_TM_HEAP}" -gt "0" ]; then + +TM_HEAP_SIZE=${FLINK_TM_HEAP} +TM_OFFHEAP_SIZE=0 +# some space for Netty initilization +NETTY_BUFFERS=$((1024 * 1024)) --- End diff -- Are we sure that 1MB is enough for Netty? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-2641) Integrate the off-heap memory configuration with the TaskManager start script
[ https://issues.apache.org/jira/browse/FLINK-2641?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14745096#comment-14745096 ] ASF GitHub Bot commented on FLINK-2641: --- Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/1129#discussion_r39488925 --- Diff: flink-dist/src/main/flink-bin/bin/config.sh --- @@ -173,12 +183,37 @@ IS_NUMBER="^[0-9]+$" # Define FLINK_JM_HEAP if it is not already set if [ -z "${FLINK_JM_HEAP}" ]; then -FLINK_JM_HEAP=$(readFromConfig ${KEY_JOBM_HEAP_MB} 0 "${YAML_CONF}") +FLINK_JM_HEAP=$(readFromConfig ${KEY_JOBM_MEM_SIZE} 0 "${YAML_CONF}") fi # Define FLINK_TM_HEAP if it is not already set if [ -z "${FLINK_TM_HEAP}" ]; then -FLINK_TM_HEAP=$(readFromConfig ${KEY_TASKM_HEAP_MB} 0 "${YAML_CONF}") +FLINK_TM_HEAP=$(readFromConfig ${KEY_TASKM_MEM_SIZE} 0 "${YAML_CONF}") +fi + +# Define FLINK_TM_MEM_MANAGED_SIZE if it is not already set +if [ -z "${FLINK_TM_MEM_MANAGED_SIZE}" ]; then +FLINK_TM_MEM_MANAGED_SIZE=$(readFromConfig ${KEY_TASKM_MEM_MANAGED_SIZE} 0 "${YAML_CONF}") +fi + +# Define FLINK_TM_MEM_MANAGED_FRACTION if it is not already set +if [ -z "${FLINK_TM_MEM_MANAGED_FRACTION}" ]; then +FLINK_TM_MEM_MANAGED_FRACTION=$(readFromConfig ${KEY_TASKM_MEM_MANAGED_FRACTION} 0 "${YAML_CONF}") +fi + +# Define FLINK_TM_MEM_NETWORK_SIZE if it is not already set +if [ -z "${FLINK_TM_MEM_NETWORK_SIZE}" ]; then +BUFFER_SIZE=$(readFromConfig ${KEY_TASKM_MEM_SEGMENT_SIZE} "0" "${YAML_CONF}") +if [ "${BUFFER_SIZE}" -eq "0" ]; then --- End diff -- `KEY_TASKM_MEM_NETWORK_BUFFER_SIZE` is the deprecated key. In the existing Flink code, it is overridden by the new default network buffer size key `KEY_TASKM_MEM_SEGMENT_SIZE`. > Integrate the off-heap memory configuration with the TaskManager start script > - > > Key: FLINK-2641 > URL: https://issues.apache.org/jira/browse/FLINK-2641 > Project: Flink > Issue Type: New Feature > Components: Start-Stop Scripts >Affects Versions: 0.10 >Reporter: Stephan Ewen >Assignee: Maximilian Michels > Fix For: 0.10 > > > The TaskManager start script needs to adjust the {{-Xmx}}, {{-Xms}}, and > {{-XX:MaxDirectMemorySize}} parameters according to the off-heap memory > settings. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2641) Integrate the off-heap memory configuration with the TaskManager start script
[ https://issues.apache.org/jira/browse/FLINK-2641?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14745095#comment-14745095 ] ASF GitHub Bot commented on FLINK-2641: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1129#discussion_r39488877 --- Diff: flink-dist/src/main/flink-bin/bin/taskmanager.sh --- @@ -51,13 +51,41 @@ if [[ $STARTSTOP == "start" ]]; then fi fi -if [[ ! ${FLINK_TM_HEAP} =~ ${IS_NUMBER} ]]; then -echo "[ERROR] Configured TaskManager JVM heap size is not a number. Please set '$KEY_TASKM_HEAP_MB' in $FLINK_CONF_FILE." +if [[ ! ${FLINK_TM_HEAP} =~ ${IS_NUMBER} ]] || [[ "${FLINK_TM_HEAP}" -lt "0" ]]; then +echo "[ERROR] Configured TaskManager JVM heap size is not a number. Please set '${KEY_TASKM_MEM_SIZE}' in ${FLINK_CONF_FILE}." exit 1 fi -if [ "$FLINK_TM_HEAP" -gt 0 ]; then -export JVM_ARGS="$JVM_ARGS -Xms"$FLINK_TM_HEAP"m -Xmx"$FLINK_TM_HEAP"m" +if [ "${FLINK_TM_HEAP}" -gt "0" ]; then + +TM_HEAP_SIZE=${FLINK_TM_HEAP} +TM_OFFHEAP_SIZE=0 +# some space for Netty initilization +NETTY_BUFFERS=$((1024 * 1024)) --- End diff -- Are we sure that 1MB is enough for Netty? > Integrate the off-heap memory configuration with the TaskManager start script > - > > Key: FLINK-2641 > URL: https://issues.apache.org/jira/browse/FLINK-2641 > Project: Flink > Issue Type: New Feature > Components: Start-Stop Scripts >Affects Versions: 0.10 >Reporter: Stephan Ewen >Assignee: Maximilian Michels > Fix For: 0.10 > > > The TaskManager start script needs to adjust the {{-Xmx}}, {{-Xms}}, and > {{-XX:MaxDirectMemorySize}} parameters according to the off-heap memory > settings. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-2641] integrate off-heap memory configu...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/1129#discussion_r39488925 --- Diff: flink-dist/src/main/flink-bin/bin/config.sh --- @@ -173,12 +183,37 @@ IS_NUMBER="^[0-9]+$" # Define FLINK_JM_HEAP if it is not already set if [ -z "${FLINK_JM_HEAP}" ]; then -FLINK_JM_HEAP=$(readFromConfig ${KEY_JOBM_HEAP_MB} 0 "${YAML_CONF}") +FLINK_JM_HEAP=$(readFromConfig ${KEY_JOBM_MEM_SIZE} 0 "${YAML_CONF}") fi # Define FLINK_TM_HEAP if it is not already set if [ -z "${FLINK_TM_HEAP}" ]; then -FLINK_TM_HEAP=$(readFromConfig ${KEY_TASKM_HEAP_MB} 0 "${YAML_CONF}") +FLINK_TM_HEAP=$(readFromConfig ${KEY_TASKM_MEM_SIZE} 0 "${YAML_CONF}") +fi + +# Define FLINK_TM_MEM_MANAGED_SIZE if it is not already set +if [ -z "${FLINK_TM_MEM_MANAGED_SIZE}" ]; then +FLINK_TM_MEM_MANAGED_SIZE=$(readFromConfig ${KEY_TASKM_MEM_MANAGED_SIZE} 0 "${YAML_CONF}") +fi + +# Define FLINK_TM_MEM_MANAGED_FRACTION if it is not already set +if [ -z "${FLINK_TM_MEM_MANAGED_FRACTION}" ]; then +FLINK_TM_MEM_MANAGED_FRACTION=$(readFromConfig ${KEY_TASKM_MEM_MANAGED_FRACTION} 0 "${YAML_CONF}") +fi + +# Define FLINK_TM_MEM_NETWORK_SIZE if it is not already set +if [ -z "${FLINK_TM_MEM_NETWORK_SIZE}" ]; then +BUFFER_SIZE=$(readFromConfig ${KEY_TASKM_MEM_SEGMENT_SIZE} "0" "${YAML_CONF}") +if [ "${BUFFER_SIZE}" -eq "0" ]; then --- End diff -- `KEY_TASKM_MEM_NETWORK_BUFFER_SIZE` is the deprecated key. In the existing Flink code, it is overridden by the new default network buffer size key `KEY_TASKM_MEM_SEGMENT_SIZE`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-2641) Integrate the off-heap memory configuration with the TaskManager start script
[ https://issues.apache.org/jira/browse/FLINK-2641?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14745098#comment-14745098 ] ASF GitHub Bot commented on FLINK-2641: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1129#discussion_r39488999 --- Diff: flink-dist/src/main/flink-bin/bin/taskmanager.sh --- @@ -51,13 +51,41 @@ if [[ $STARTSTOP == "start" ]]; then fi fi -if [[ ! ${FLINK_TM_HEAP} =~ ${IS_NUMBER} ]]; then -echo "[ERROR] Configured TaskManager JVM heap size is not a number. Please set '$KEY_TASKM_HEAP_MB' in $FLINK_CONF_FILE." +if [[ ! ${FLINK_TM_HEAP} =~ ${IS_NUMBER} ]] || [[ "${FLINK_TM_HEAP}" -lt "0" ]]; then +echo "[ERROR] Configured TaskManager JVM heap size is not a number. Please set '${KEY_TASKM_MEM_SIZE}' in ${FLINK_CONF_FILE}." exit 1 fi -if [ "$FLINK_TM_HEAP" -gt 0 ]; then -export JVM_ARGS="$JVM_ARGS -Xms"$FLINK_TM_HEAP"m -Xmx"$FLINK_TM_HEAP"m" +if [ "${FLINK_TM_HEAP}" -gt "0" ]; then + +TM_HEAP_SIZE=${FLINK_TM_HEAP} +TM_OFFHEAP_SIZE=0 +# some space for Netty initilization +NETTY_BUFFERS=$((1024 * 1024)) + +if useOffHeapMemory; then +if [[ "${FLINK_TM_MEM_MANAGED_SIZE}" -gt "0" ]]; then +# We split up the total memory in heap and off-heap memory +if [[ "${FLINK_TM_HEAP}" -le "${FLINK_TM_MEM_MANAGED_SIZE}" ]]; then +echo "[ERROR] Configured TaskManager memory size ('${KEY_TASKM_MEM_SIZE}') must be larger than the managed memory size ('${KEY_TASKM_MEM_MANAGED_SIZE}')." +exit 1 +fi +TM_OFFHEAP_SIZE=${FLINK_TM_MEM_MANAGED_SIZE} +TM_HEAP_SIZE=$((FLINK_TM_HEAP - FLINK_TM_MEM_MANAGED_SIZE)) +else +# We calculate the memory using a fraction of the total memory +if [[ `bc -l <<< "${FLINK_TM_MEM_MANAGED_FRACTION} >= 1.0"` != "0" ]]; then +echo "[ERROR] Configured TaskManager managed memory fraction is not a valid value. Please set '${KEY_TASKM_MEM_MANAGED_FRACTION}' in ${FLINK_CONF_FILE}" +exit 1 +fi +# recalculate the JVM heap memory by taking the off-heap ratio into account +TM_OFFHEAP_SIZE=`printf '%.0f\n' $(bc -l <<< "${FLINK_TM_HEAP} * ${FLINK_TM_MEM_MANAGED_FRACTION}")` +TM_HEAP_SIZE=$((FLINK_TM_HEAP - TM_OFFHEAP_SIZE)) +fi +fi + +export JVM_ARGS="${JVM_ARGS} -Xms${TM_HEAP_SIZE}M -Xmx${TM_HEAP_SIZE}M -XX:MaxDirectMemorySize=$((TM_OFFHEAP_SIZE * 1024 * 1024 + FLINK_TM_MEM_NETWORK_SIZE + NETTY_BUFFERS))" --- End diff -- Append "M" to -XX:MaxDirectMemorySize parameter? > Integrate the off-heap memory configuration with the TaskManager start script > - > > Key: FLINK-2641 > URL: https://issues.apache.org/jira/browse/FLINK-2641 > Project: Flink > Issue Type: New Feature > Components: Start-Stop Scripts >Affects Versions: 0.10 >Reporter: Stephan Ewen >Assignee: Maximilian Michels > Fix For: 0.10 > > > The TaskManager start script needs to adjust the {{-Xmx}}, {{-Xms}}, and > {{-XX:MaxDirectMemorySize}} parameters according to the off-heap memory > settings. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: Getter for wrapped StreamExecutionEnvironment ...
Github user StephanEwen commented on the pull request: https://github.com/apache/flink/pull/1120#issuecomment-140326566 I think adding this accessor is fine. Also, in case someone goes crazy and wants to mix the Java and Scala-style functions in one program ;-) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-2641) Integrate the off-heap memory configuration with the TaskManager start script
[ https://issues.apache.org/jira/browse/FLINK-2641?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14745084#comment-14745084 ] ASF GitHub Bot commented on FLINK-2641: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1129#discussion_r39488168 --- Diff: flink-dist/src/main/flink-bin/bin/config.sh --- @@ -173,12 +183,37 @@ IS_NUMBER="^[0-9]+$" # Define FLINK_JM_HEAP if it is not already set if [ -z "${FLINK_JM_HEAP}" ]; then -FLINK_JM_HEAP=$(readFromConfig ${KEY_JOBM_HEAP_MB} 0 "${YAML_CONF}") +FLINK_JM_HEAP=$(readFromConfig ${KEY_JOBM_MEM_SIZE} 0 "${YAML_CONF}") fi # Define FLINK_TM_HEAP if it is not already set if [ -z "${FLINK_TM_HEAP}" ]; then -FLINK_TM_HEAP=$(readFromConfig ${KEY_TASKM_HEAP_MB} 0 "${YAML_CONF}") +FLINK_TM_HEAP=$(readFromConfig ${KEY_TASKM_MEM_SIZE} 0 "${YAML_CONF}") +fi + +# Define FLINK_TM_MEM_MANAGED_SIZE if it is not already set +if [ -z "${FLINK_TM_MEM_MANAGED_SIZE}" ]; then +FLINK_TM_MEM_MANAGED_SIZE=$(readFromConfig ${KEY_TASKM_MEM_MANAGED_SIZE} 0 "${YAML_CONF}") +fi + +# Define FLINK_TM_MEM_MANAGED_FRACTION if it is not already set +if [ -z "${FLINK_TM_MEM_MANAGED_FRACTION}" ]; then +FLINK_TM_MEM_MANAGED_FRACTION=$(readFromConfig ${KEY_TASKM_MEM_MANAGED_FRACTION} 0 "${YAML_CONF}") +fi + +# Define FLINK_TM_MEM_NETWORK_SIZE if it is not already set +if [ -z "${FLINK_TM_MEM_NETWORK_SIZE}" ]; then +BUFFER_SIZE=$(readFromConfig ${KEY_TASKM_MEM_SEGMENT_SIZE} "0" "${YAML_CONF}") +if [ "${BUFFER_SIZE}" -eq "0" ]; then --- End diff -- Shouldn't it be the other way round, i.e., use `KEY_TASKM_MEM_NETWORK_BUFFER_SIZE` if defined and `KEY_TASKM_MEM_SEGMENT_SIZE` otherwise? > Integrate the off-heap memory configuration with the TaskManager start script > - > > Key: FLINK-2641 > URL: https://issues.apache.org/jira/browse/FLINK-2641 > Project: Flink > Issue Type: New Feature > Components: Start-Stop Scripts >Affects Versions: 0.10 >Reporter: Stephan Ewen >Assignee: Maximilian Michels > Fix For: 0.10 > > > The TaskManager start script needs to adjust the {{-Xmx}}, {{-Xms}}, and > {{-XX:MaxDirectMemorySize}} parameters according to the off-heap memory > settings. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-2641] integrate off-heap memory configu...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1129#discussion_r39488168 --- Diff: flink-dist/src/main/flink-bin/bin/config.sh --- @@ -173,12 +183,37 @@ IS_NUMBER="^[0-9]+$" # Define FLINK_JM_HEAP if it is not already set if [ -z "${FLINK_JM_HEAP}" ]; then -FLINK_JM_HEAP=$(readFromConfig ${KEY_JOBM_HEAP_MB} 0 "${YAML_CONF}") +FLINK_JM_HEAP=$(readFromConfig ${KEY_JOBM_MEM_SIZE} 0 "${YAML_CONF}") fi # Define FLINK_TM_HEAP if it is not already set if [ -z "${FLINK_TM_HEAP}" ]; then -FLINK_TM_HEAP=$(readFromConfig ${KEY_TASKM_HEAP_MB} 0 "${YAML_CONF}") +FLINK_TM_HEAP=$(readFromConfig ${KEY_TASKM_MEM_SIZE} 0 "${YAML_CONF}") +fi + +# Define FLINK_TM_MEM_MANAGED_SIZE if it is not already set +if [ -z "${FLINK_TM_MEM_MANAGED_SIZE}" ]; then +FLINK_TM_MEM_MANAGED_SIZE=$(readFromConfig ${KEY_TASKM_MEM_MANAGED_SIZE} 0 "${YAML_CONF}") +fi + +# Define FLINK_TM_MEM_MANAGED_FRACTION if it is not already set +if [ -z "${FLINK_TM_MEM_MANAGED_FRACTION}" ]; then +FLINK_TM_MEM_MANAGED_FRACTION=$(readFromConfig ${KEY_TASKM_MEM_MANAGED_FRACTION} 0 "${YAML_CONF}") +fi + +# Define FLINK_TM_MEM_NETWORK_SIZE if it is not already set +if [ -z "${FLINK_TM_MEM_NETWORK_SIZE}" ]; then +BUFFER_SIZE=$(readFromConfig ${KEY_TASKM_MEM_SEGMENT_SIZE} "0" "${YAML_CONF}") +if [ "${BUFFER_SIZE}" -eq "0" ]; then --- End diff -- Shouldn't it be the other way round, i.e., use `KEY_TASKM_MEM_NETWORK_BUFFER_SIZE` if defined and `KEY_TASKM_MEM_SEGMENT_SIZE` otherwise? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-2641) Integrate the off-heap memory configuration with the TaskManager start script
[ https://issues.apache.org/jira/browse/FLINK-2641?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14745092#comment-14745092 ] ASF GitHub Bot commented on FLINK-2641: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1129#discussion_r39488510 --- Diff: flink-dist/src/main/flink-bin/bin/taskmanager.sh --- @@ -51,13 +51,41 @@ if [[ $STARTSTOP == "start" ]]; then fi fi -if [[ ! ${FLINK_TM_HEAP} =~ ${IS_NUMBER} ]]; then -echo "[ERROR] Configured TaskManager JVM heap size is not a number. Please set '$KEY_TASKM_HEAP_MB' in $FLINK_CONF_FILE." +if [[ ! ${FLINK_TM_HEAP} =~ ${IS_NUMBER} ]] || [[ "${FLINK_TM_HEAP}" -lt "0" ]]; then +echo "[ERROR] Configured TaskManager JVM heap size is not a number. Please set '${KEY_TASKM_MEM_SIZE}' in ${FLINK_CONF_FILE}." exit 1 fi -if [ "$FLINK_TM_HEAP" -gt 0 ]; then -export JVM_ARGS="$JVM_ARGS -Xms"$FLINK_TM_HEAP"m -Xmx"$FLINK_TM_HEAP"m" +if [ "${FLINK_TM_HEAP}" -gt "0" ]; then + +TM_HEAP_SIZE=${FLINK_TM_HEAP} +TM_OFFHEAP_SIZE=0 +# some space for Netty initilization --- End diff -- typo +"a" > Integrate the off-heap memory configuration with the TaskManager start script > - > > Key: FLINK-2641 > URL: https://issues.apache.org/jira/browse/FLINK-2641 > Project: Flink > Issue Type: New Feature > Components: Start-Stop Scripts >Affects Versions: 0.10 >Reporter: Stephan Ewen >Assignee: Maximilian Michels > Fix For: 0.10 > > > The TaskManager start script needs to adjust the {{-Xmx}}, {{-Xms}}, and > {{-XX:MaxDirectMemorySize}} parameters according to the off-heap memory > settings. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-2641] integrate off-heap memory configu...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1129#discussion_r39488510 --- Diff: flink-dist/src/main/flink-bin/bin/taskmanager.sh --- @@ -51,13 +51,41 @@ if [[ $STARTSTOP == "start" ]]; then fi fi -if [[ ! ${FLINK_TM_HEAP} =~ ${IS_NUMBER} ]]; then -echo "[ERROR] Configured TaskManager JVM heap size is not a number. Please set '$KEY_TASKM_HEAP_MB' in $FLINK_CONF_FILE." +if [[ ! ${FLINK_TM_HEAP} =~ ${IS_NUMBER} ]] || [[ "${FLINK_TM_HEAP}" -lt "0" ]]; then +echo "[ERROR] Configured TaskManager JVM heap size is not a number. Please set '${KEY_TASKM_MEM_SIZE}' in ${FLINK_CONF_FILE}." exit 1 fi -if [ "$FLINK_TM_HEAP" -gt 0 ]; then -export JVM_ARGS="$JVM_ARGS -Xms"$FLINK_TM_HEAP"m -Xmx"$FLINK_TM_HEAP"m" +if [ "${FLINK_TM_HEAP}" -gt "0" ]; then + +TM_HEAP_SIZE=${FLINK_TM_HEAP} +TM_OFFHEAP_SIZE=0 +# some space for Netty initilization --- End diff -- typo +"a" --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-2641) Integrate the off-heap memory configuration with the TaskManager start script
[ https://issues.apache.org/jira/browse/FLINK-2641?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14745118#comment-14745118 ] ASF GitHub Bot commented on FLINK-2641: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1129#discussion_r39489771 --- Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala --- @@ -1586,32 +1586,29 @@ object TaskManager { ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY, "MemoryManager fraction of the free memory must be between 0.0 and 1.0") - val relativeMemSize = (EnvironmentInformation.getSizeOfFreeHeapMemoryWithDefrag() * -fraction).toLong + if (memType == MemoryType.HEAP) { - LOG.info(s"Using $fraction of the currently free heap space for Flink managed " + -s" heap memory (${relativeMemSize >> 20} MB).") +val relativeMemSize = (EnvironmentInformation.getSizeOfFreeHeapMemoryWithDefrag() * + fraction).toLong - relativeMemSize -} -else { - val ratio = configuration.getFloat( -ConfigConstants.TASK_MANAGER_MEMORY_OFF_HEAP_RATIO_KEY, -ConfigConstants.DEFAULT_MEMORY_MANAGER_MEMORY_OFF_HEAP_RATIO) - - checkConfigParameter(ratio > 0.0f, -ConfigConstants.TASK_MANAGER_MEMORY_OFF_HEAP_RATIO_KEY, -"MemoryManager ratio (off-heap memory / heap size) must be larger than zero") - - val maxHeapSize = EnvironmentInformation.getMaxJvmHeapMemory() - val relativeMemSize = (maxHeapSize * ratio).toLong +LOG.info(s"Using $fraction of the currently free heap space for Flink managed " + + s" heap memory (${relativeMemSize >> 20} MB).") + +relativeMemSize + } else if (memType == MemoryType.OFF_HEAP) { - LOG.info(s"Using $ratio time the heap size (${maxHeapSize} bytes) for Flink " + -s"managed off-heap memory (${relativeMemSize >> 20} MB).") +// The maximum heap memory has been adjusted according to the fraction +val directMemorySize = (EnvironmentInformation.getMaxJvmHeapMemory() / fraction).toLong - relativeMemSize +LOG.info(s"Using $fraction of the maximum memory size for " + + s"Flink managed off-heap memory (${directMemorySize >> 20} MB).") + +directMemorySize + } else { --- End diff -- line break > Integrate the off-heap memory configuration with the TaskManager start script > - > > Key: FLINK-2641 > URL: https://issues.apache.org/jira/browse/FLINK-2641 > Project: Flink > Issue Type: New Feature > Components: Start-Stop Scripts >Affects Versions: 0.10 >Reporter: Stephan Ewen >Assignee: Maximilian Michels > Fix For: 0.10 > > > The TaskManager start script needs to adjust the {{-Xmx}}, {{-Xms}}, and > {{-XX:MaxDirectMemorySize}} parameters according to the off-heap memory > settings. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-2641] integrate off-heap memory configu...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1129#discussion_r39489771 --- Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala --- @@ -1586,32 +1586,29 @@ object TaskManager { ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY, "MemoryManager fraction of the free memory must be between 0.0 and 1.0") - val relativeMemSize = (EnvironmentInformation.getSizeOfFreeHeapMemoryWithDefrag() * -fraction).toLong + if (memType == MemoryType.HEAP) { - LOG.info(s"Using $fraction of the currently free heap space for Flink managed " + -s" heap memory (${relativeMemSize >> 20} MB).") +val relativeMemSize = (EnvironmentInformation.getSizeOfFreeHeapMemoryWithDefrag() * + fraction).toLong - relativeMemSize -} -else { - val ratio = configuration.getFloat( -ConfigConstants.TASK_MANAGER_MEMORY_OFF_HEAP_RATIO_KEY, -ConfigConstants.DEFAULT_MEMORY_MANAGER_MEMORY_OFF_HEAP_RATIO) - - checkConfigParameter(ratio > 0.0f, -ConfigConstants.TASK_MANAGER_MEMORY_OFF_HEAP_RATIO_KEY, -"MemoryManager ratio (off-heap memory / heap size) must be larger than zero") - - val maxHeapSize = EnvironmentInformation.getMaxJvmHeapMemory() - val relativeMemSize = (maxHeapSize * ratio).toLong +LOG.info(s"Using $fraction of the currently free heap space for Flink managed " + + s" heap memory (${relativeMemSize >> 20} MB).") + +relativeMemSize + } else if (memType == MemoryType.OFF_HEAP) { - LOG.info(s"Using $ratio time the heap size (${maxHeapSize} bytes) for Flink " + -s"managed off-heap memory (${relativeMemSize >> 20} MB).") +// The maximum heap memory has been adjusted according to the fraction +val directMemorySize = (EnvironmentInformation.getMaxJvmHeapMemory() / fraction).toLong - relativeMemSize +LOG.info(s"Using $fraction of the maximum memory size for " + + s"Flink managed off-heap memory (${directMemorySize >> 20} MB).") + +directMemorySize + } else { --- End diff -- line break --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-2641) Integrate the off-heap memory configuration with the TaskManager start script
[ https://issues.apache.org/jira/browse/FLINK-2641?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14745117#comment-14745117 ] ASF GitHub Bot commented on FLINK-2641: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1129#discussion_r39489755 --- Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala --- @@ -1586,32 +1586,29 @@ object TaskManager { ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY, "MemoryManager fraction of the free memory must be between 0.0 and 1.0") - val relativeMemSize = (EnvironmentInformation.getSizeOfFreeHeapMemoryWithDefrag() * -fraction).toLong + if (memType == MemoryType.HEAP) { - LOG.info(s"Using $fraction of the currently free heap space for Flink managed " + -s" heap memory (${relativeMemSize >> 20} MB).") +val relativeMemSize = (EnvironmentInformation.getSizeOfFreeHeapMemoryWithDefrag() * + fraction).toLong - relativeMemSize -} -else { - val ratio = configuration.getFloat( -ConfigConstants.TASK_MANAGER_MEMORY_OFF_HEAP_RATIO_KEY, -ConfigConstants.DEFAULT_MEMORY_MANAGER_MEMORY_OFF_HEAP_RATIO) - - checkConfigParameter(ratio > 0.0f, -ConfigConstants.TASK_MANAGER_MEMORY_OFF_HEAP_RATIO_KEY, -"MemoryManager ratio (off-heap memory / heap size) must be larger than zero") - - val maxHeapSize = EnvironmentInformation.getMaxJvmHeapMemory() - val relativeMemSize = (maxHeapSize * ratio).toLong +LOG.info(s"Using $fraction of the currently free heap space for Flink managed " + + s" heap memory (${relativeMemSize >> 20} MB).") + +relativeMemSize + } else if (memType == MemoryType.OFF_HEAP) { --- End diff -- inconsistent code style, linebreak after closing `'}'` > Integrate the off-heap memory configuration with the TaskManager start script > - > > Key: FLINK-2641 > URL: https://issues.apache.org/jira/browse/FLINK-2641 > Project: Flink > Issue Type: New Feature > Components: Start-Stop Scripts >Affects Versions: 0.10 >Reporter: Stephan Ewen >Assignee: Maximilian Michels > Fix For: 0.10 > > > The TaskManager start script needs to adjust the {{-Xmx}}, {{-Xms}}, and > {{-XX:MaxDirectMemorySize}} parameters according to the off-heap memory > settings. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-2641] integrate off-heap memory configu...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1129#discussion_r39489755 --- Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala --- @@ -1586,32 +1586,29 @@ object TaskManager { ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY, "MemoryManager fraction of the free memory must be between 0.0 and 1.0") - val relativeMemSize = (EnvironmentInformation.getSizeOfFreeHeapMemoryWithDefrag() * -fraction).toLong + if (memType == MemoryType.HEAP) { - LOG.info(s"Using $fraction of the currently free heap space for Flink managed " + -s" heap memory (${relativeMemSize >> 20} MB).") +val relativeMemSize = (EnvironmentInformation.getSizeOfFreeHeapMemoryWithDefrag() * + fraction).toLong - relativeMemSize -} -else { - val ratio = configuration.getFloat( -ConfigConstants.TASK_MANAGER_MEMORY_OFF_HEAP_RATIO_KEY, -ConfigConstants.DEFAULT_MEMORY_MANAGER_MEMORY_OFF_HEAP_RATIO) - - checkConfigParameter(ratio > 0.0f, -ConfigConstants.TASK_MANAGER_MEMORY_OFF_HEAP_RATIO_KEY, -"MemoryManager ratio (off-heap memory / heap size) must be larger than zero") - - val maxHeapSize = EnvironmentInformation.getMaxJvmHeapMemory() - val relativeMemSize = (maxHeapSize * ratio).toLong +LOG.info(s"Using $fraction of the currently free heap space for Flink managed " + + s" heap memory (${relativeMemSize >> 20} MB).") + +relativeMemSize + } else if (memType == MemoryType.OFF_HEAP) { --- End diff -- inconsistent code style, linebreak after closing `'}'` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...
Github user tillrohrmann commented on the pull request: https://github.com/apache/flink/pull/750#issuecomment-140335366 IMO, it would be better to wrap the `stopExecution` call in the `TaskManager` into a `Future`. This has the following reasons: First of all, with the current implementation, you'll miss all exceptions which occur in the `stop` method. Secondly, you will send a positive `TaskOperationResult` back before the stopping was executed. I haven't checked the semantic of the `TaskOperationResult` but it might be the case that the JM upon receiving this messages thinks that the stop call was successfully executed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-2111) Add "stop" signal to cleanly shutdown streaming jobs
[ https://issues.apache.org/jira/browse/FLINK-2111?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14745121#comment-14745121 ] ASF GitHub Bot commented on FLINK-2111: --- Github user tillrohrmann commented on the pull request: https://github.com/apache/flink/pull/750#issuecomment-140335366 IMO, it would be better to wrap the `stopExecution` call in the `TaskManager` into a `Future`. This has the following reasons: First of all, with the current implementation, you'll miss all exceptions which occur in the `stop` method. Secondly, you will send a positive `TaskOperationResult` back before the stopping was executed. I haven't checked the semantic of the `TaskOperationResult` but it might be the case that the JM upon receiving this messages thinks that the stop call was successfully executed. > Add "stop" signal to cleanly shutdown streaming jobs > > > Key: FLINK-2111 > URL: https://issues.apache.org/jira/browse/FLINK-2111 > Project: Flink > Issue Type: Improvement > Components: Distributed Runtime, JobManager, Local Runtime, > Streaming, TaskManager, Webfrontend >Reporter: Matthias J. Sax >Assignee: Matthias J. Sax >Priority: Minor > > Currently, streaming jobs can only be stopped using "cancel" command, what is > a "hard" stop with no clean shutdown. > The new introduced "stop" signal, will only affect streaming source tasks > such that the sources can stop emitting data and shutdown cleanly, resulting > in a clean shutdown of the whole streaming job. > This feature is a pre-requirment for > https://issues.apache.org/jira/browse/FLINK-1929 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-2641] integrate off-heap memory configu...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/1129#discussion_r39494490 --- Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala --- @@ -1586,32 +1586,29 @@ object TaskManager { ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY, "MemoryManager fraction of the free memory must be between 0.0 and 1.0") - val relativeMemSize = (EnvironmentInformation.getSizeOfFreeHeapMemoryWithDefrag() * -fraction).toLong + if (memType == MemoryType.HEAP) { - LOG.info(s"Using $fraction of the currently free heap space for Flink managed " + -s" heap memory (${relativeMemSize >> 20} MB).") +val relativeMemSize = (EnvironmentInformation.getSizeOfFreeHeapMemoryWithDefrag() * + fraction).toLong - relativeMemSize -} -else { - val ratio = configuration.getFloat( -ConfigConstants.TASK_MANAGER_MEMORY_OFF_HEAP_RATIO_KEY, -ConfigConstants.DEFAULT_MEMORY_MANAGER_MEMORY_OFF_HEAP_RATIO) - - checkConfigParameter(ratio > 0.0f, -ConfigConstants.TASK_MANAGER_MEMORY_OFF_HEAP_RATIO_KEY, -"MemoryManager ratio (off-heap memory / heap size) must be larger than zero") - - val maxHeapSize = EnvironmentInformation.getMaxJvmHeapMemory() - val relativeMemSize = (maxHeapSize * ratio).toLong +LOG.info(s"Using $fraction of the currently free heap space for Flink managed " + + s" heap memory (${relativeMemSize >> 20} MB).") + +relativeMemSize + } else if (memType == MemoryType.OFF_HEAP) { - LOG.info(s"Using $ratio time the heap size (${maxHeapSize} bytes) for Flink " + -s"managed off-heap memory (${relativeMemSize >> 20} MB).") +// The maximum heap memory has been adjusted according to the fraction +val directMemorySize = (EnvironmentInformation.getMaxJvmHeapMemory() / fraction).toLong --- End diff -- Never mind, it's (HEAPSIZE / (1.0 - fraction)) * fraction.. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-2641) Integrate the off-heap memory configuration with the TaskManager start script
[ https://issues.apache.org/jira/browse/FLINK-2641?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14745193#comment-14745193 ] ASF GitHub Bot commented on FLINK-2641: --- Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/1129#discussion_r39494490 --- Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala --- @@ -1586,32 +1586,29 @@ object TaskManager { ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY, "MemoryManager fraction of the free memory must be between 0.0 and 1.0") - val relativeMemSize = (EnvironmentInformation.getSizeOfFreeHeapMemoryWithDefrag() * -fraction).toLong + if (memType == MemoryType.HEAP) { - LOG.info(s"Using $fraction of the currently free heap space for Flink managed " + -s" heap memory (${relativeMemSize >> 20} MB).") +val relativeMemSize = (EnvironmentInformation.getSizeOfFreeHeapMemoryWithDefrag() * + fraction).toLong - relativeMemSize -} -else { - val ratio = configuration.getFloat( -ConfigConstants.TASK_MANAGER_MEMORY_OFF_HEAP_RATIO_KEY, -ConfigConstants.DEFAULT_MEMORY_MANAGER_MEMORY_OFF_HEAP_RATIO) - - checkConfigParameter(ratio > 0.0f, -ConfigConstants.TASK_MANAGER_MEMORY_OFF_HEAP_RATIO_KEY, -"MemoryManager ratio (off-heap memory / heap size) must be larger than zero") - - val maxHeapSize = EnvironmentInformation.getMaxJvmHeapMemory() - val relativeMemSize = (maxHeapSize * ratio).toLong +LOG.info(s"Using $fraction of the currently free heap space for Flink managed " + + s" heap memory (${relativeMemSize >> 20} MB).") + +relativeMemSize + } else if (memType == MemoryType.OFF_HEAP) { - LOG.info(s"Using $ratio time the heap size (${maxHeapSize} bytes) for Flink " + -s"managed off-heap memory (${relativeMemSize >> 20} MB).") +// The maximum heap memory has been adjusted according to the fraction +val directMemorySize = (EnvironmentInformation.getMaxJvmHeapMemory() / fraction).toLong --- End diff -- Never mind, it's (HEAPSIZE / (1.0 - fraction)) * fraction.. > Integrate the off-heap memory configuration with the TaskManager start script > - > > Key: FLINK-2641 > URL: https://issues.apache.org/jira/browse/FLINK-2641 > Project: Flink > Issue Type: New Feature > Components: Start-Stop Scripts >Affects Versions: 0.10 >Reporter: Stephan Ewen >Assignee: Maximilian Michels > Fix For: 0.10 > > > The TaskManager start script needs to adjust the {{-Xmx}}, {{-Xms}}, and > {{-XX:MaxDirectMemorySize}} parameters according to the off-heap memory > settings. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Resolved] (FLINK-2582) Document how to build Flink with other Scala versions
[ https://issues.apache.org/jira/browse/FLINK-2582?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chiwan Park resolved FLINK-2582. Resolution: Fixed > Document how to build Flink with other Scala versions > - > > Key: FLINK-2582 > URL: https://issues.apache.org/jira/browse/FLINK-2582 > Project: Flink > Issue Type: Improvement > Components: Documentation >Affects Versions: 0.10 >Reporter: Stephan Ewen >Assignee: Stephan Ewen > Fix For: 0.10 > > > On can build Flink for different Scala versions. > We should describe in the documentation how to do that, ideally next to > building for different Hadoop versions. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-2641] integrate off-heap memory configu...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1129#discussion_r39488999 --- Diff: flink-dist/src/main/flink-bin/bin/taskmanager.sh --- @@ -51,13 +51,41 @@ if [[ $STARTSTOP == "start" ]]; then fi fi -if [[ ! ${FLINK_TM_HEAP} =~ ${IS_NUMBER} ]]; then -echo "[ERROR] Configured TaskManager JVM heap size is not a number. Please set '$KEY_TASKM_HEAP_MB' in $FLINK_CONF_FILE." +if [[ ! ${FLINK_TM_HEAP} =~ ${IS_NUMBER} ]] || [[ "${FLINK_TM_HEAP}" -lt "0" ]]; then +echo "[ERROR] Configured TaskManager JVM heap size is not a number. Please set '${KEY_TASKM_MEM_SIZE}' in ${FLINK_CONF_FILE}." exit 1 fi -if [ "$FLINK_TM_HEAP" -gt 0 ]; then -export JVM_ARGS="$JVM_ARGS -Xms"$FLINK_TM_HEAP"m -Xmx"$FLINK_TM_HEAP"m" +if [ "${FLINK_TM_HEAP}" -gt "0" ]; then + +TM_HEAP_SIZE=${FLINK_TM_HEAP} +TM_OFFHEAP_SIZE=0 +# some space for Netty initilization +NETTY_BUFFERS=$((1024 * 1024)) + +if useOffHeapMemory; then +if [[ "${FLINK_TM_MEM_MANAGED_SIZE}" -gt "0" ]]; then +# We split up the total memory in heap and off-heap memory +if [[ "${FLINK_TM_HEAP}" -le "${FLINK_TM_MEM_MANAGED_SIZE}" ]]; then +echo "[ERROR] Configured TaskManager memory size ('${KEY_TASKM_MEM_SIZE}') must be larger than the managed memory size ('${KEY_TASKM_MEM_MANAGED_SIZE}')." +exit 1 +fi +TM_OFFHEAP_SIZE=${FLINK_TM_MEM_MANAGED_SIZE} +TM_HEAP_SIZE=$((FLINK_TM_HEAP - FLINK_TM_MEM_MANAGED_SIZE)) +else +# We calculate the memory using a fraction of the total memory +if [[ `bc -l <<< "${FLINK_TM_MEM_MANAGED_FRACTION} >= 1.0"` != "0" ]]; then +echo "[ERROR] Configured TaskManager managed memory fraction is not a valid value. Please set '${KEY_TASKM_MEM_MANAGED_FRACTION}' in ${FLINK_CONF_FILE}" +exit 1 +fi +# recalculate the JVM heap memory by taking the off-heap ratio into account +TM_OFFHEAP_SIZE=`printf '%.0f\n' $(bc -l <<< "${FLINK_TM_HEAP} * ${FLINK_TM_MEM_MANAGED_FRACTION}")` +TM_HEAP_SIZE=$((FLINK_TM_HEAP - TM_OFFHEAP_SIZE)) +fi +fi + +export JVM_ARGS="${JVM_ARGS} -Xms${TM_HEAP_SIZE}M -Xmx${TM_HEAP_SIZE}M -XX:MaxDirectMemorySize=$((TM_OFFHEAP_SIZE * 1024 * 1024 + FLINK_TM_MEM_NETWORK_SIZE + NETTY_BUFFERS))" --- End diff -- Append "M" to -XX:MaxDirectMemorySize parameter? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2017] Add predefined required parameter...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1097#discussion_r39492715 --- Diff: flink-java/src/main/java/org/apache/flink/api/java/utils/RequiredParameter.java --- @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.utils; + +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; + +/** + * Facility to manage required parameters in user defined functions. + */ +public class RequiredParameter { + + private static final String HELP_TEXT_PARAM_DELIMITER = "\t"; + private static final String HELP_TEXT_LINE_DELIMITER = "\n"; + + private HashMapdata; + + public RequiredParameter() { + this.data = new HashMap<>(); + } + + public void add(Option option) { + this.data.put(option.getName(), option); --- End diff -- Check overwriting of existing option with same name? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-2017) Add predefined required parameters to ParameterTool
[ https://issues.apache.org/jira/browse/FLINK-2017?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14745167#comment-14745167 ] ASF GitHub Bot commented on FLINK-2017: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1097#discussion_r39492715 --- Diff: flink-java/src/main/java/org/apache/flink/api/java/utils/RequiredParameter.java --- @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.utils; + +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; + +/** + * Facility to manage required parameters in user defined functions. + */ +public class RequiredParameter { + + private static final String HELP_TEXT_PARAM_DELIMITER = "\t"; + private static final String HELP_TEXT_LINE_DELIMITER = "\n"; + + private HashMapdata; + + public RequiredParameter() { + this.data = new HashMap<>(); + } + + public void add(Option option) { + this.data.put(option.getName(), option); --- End diff -- Check overwriting of existing option with same name? > Add predefined required parameters to ParameterTool > --- > > Key: FLINK-2017 > URL: https://issues.apache.org/jira/browse/FLINK-2017 > Project: Flink > Issue Type: Improvement >Affects Versions: 0.9 >Reporter: Robert Metzger > Labels: starter > > In FLINK-1525 we've added the {{ParameterTool}}. > During the PR review, there was a request for required parameters. > This issue is about implementing a facility to define required parameters. > The tool should also be able to print a help menu with a list of all > parameters. > This test case shows my initial ideas how to design the API > {code} > @Test > public void requiredParameters() { > RequiredParameters required = new RequiredParameters(); > Option input = required.add("input").alt("i").help("Path to > input file or directory"); // parameter with long and short variant > required.add("output"); // parameter only with long variant > Option parallelism = > required.add("parallelism").alt("p").type(Integer.class); // parameter with > type > Option spOption = > required.add("sourceParallelism").alt("sp").defaultValue(12).help("Number > specifying the number of parallel data source instances"); // parameter with > default value, specifying the type. > Option executionType = > required.add("executionType").alt("et").defaultValue("pipelined").choices("pipelined", > "batch"); > ParameterUtil parameter = ParameterUtil.fromArgs(new > String[]{"-i", "someinput", "--output", "someout", "-p", "15"}); > required.check(parameter); > required.printHelp(); > required.checkAndPopulate(parameter); > String inputString = input.get(); > int par = parallelism.getInteger(); > String output = parameter.get("output"); > int sourcePar = parameter.getInteger(spOption.getName()); > } > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2017) Add predefined required parameters to ParameterTool
[ https://issues.apache.org/jira/browse/FLINK-2017?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14745171#comment-14745171 ] ASF GitHub Bot commented on FLINK-2017: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1097#discussion_r39493290 --- Diff: flink-java/src/main/java/org/apache/flink/api/java/utils/RequiredParameter.java --- @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.utils; + +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; + +/** + * Facility to manage required parameters in user defined functions. + */ +public class RequiredParameter { + + private static final String HELP_TEXT_PARAM_DELIMITER = "\t"; + private static final String HELP_TEXT_LINE_DELIMITER = "\n"; + + private HashMapdata; + + public RequiredParameter() { + this.data = new HashMap<>(); + } + + public void add(Option option) { + this.data.put(option.getName(), option); + } + + /** +* Check if all parameters defined as required have been supplied. +* +* @param parameterTool - parameters supplied by user. +*/ + public void check(ParameterTool parameterTool) throws RequiredParameterException { + for (Option o : data.values()) { + // if the parameter is not present or its value is undefined, throw a RuntimeException. + if (!parameterTool.data.containsKey(o.getName()) || keyIsUndefined(o.getName(), parameterTool.data)) { + throw new RequiredParameterException("Required parameter " + o.getName() + " not present."); + } + } + } + + /** +* Check if all parameters defined as required have been supplied. If not use the default values +* which have been supplied. If no default value is supplied for a missing parameter, an exception is thrown. +* +* @param parameterTool - parameters supplied by the user. +*/ + public void checkAndPopulate(ParameterTool parameterTool) throws RequiredParameterException { + for (Option o : data.values()) { + String key = o.getName(); --- End diff -- Check for alternative / short key as well > Add predefined required parameters to ParameterTool > --- > > Key: FLINK-2017 > URL: https://issues.apache.org/jira/browse/FLINK-2017 > Project: Flink > Issue Type: Improvement >Affects Versions: 0.9 >Reporter: Robert Metzger > Labels: starter > > In FLINK-1525 we've added the {{ParameterTool}}. > During the PR review, there was a request for required parameters. > This issue is about implementing a facility to define required parameters. > The tool should also be able to print a help menu with a list of all > parameters. > This test case shows my initial ideas how to design the API > {code} > @Test > public void requiredParameters() { > RequiredParameters required = new RequiredParameters(); > Option input = required.add("input").alt("i").help("Path to > input file or directory"); // parameter with long and short variant > required.add("output"); // parameter only with long variant > Option parallelism = > required.add("parallelism").alt("p").type(Integer.class); // parameter with > type > Option spOption = > required.add("sourceParallelism").alt("sp").defaultValue(12).help("Number > specifying the number of parallel data source instances"); // parameter with > default value, specifying the type. > Option executionType = > required.add("executionType").alt("et").defaultValue("pipelined").choices("pipelined", >
[GitHub] flink pull request: [FLINK-2017] Add predefined required parameter...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1097#discussion_r39493290 --- Diff: flink-java/src/main/java/org/apache/flink/api/java/utils/RequiredParameter.java --- @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.utils; + +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; + +/** + * Facility to manage required parameters in user defined functions. + */ +public class RequiredParameter { + + private static final String HELP_TEXT_PARAM_DELIMITER = "\t"; + private static final String HELP_TEXT_LINE_DELIMITER = "\n"; + + private HashMapdata; + + public RequiredParameter() { + this.data = new HashMap<>(); + } + + public void add(Option option) { + this.data.put(option.getName(), option); + } + + /** +* Check if all parameters defined as required have been supplied. +* +* @param parameterTool - parameters supplied by user. +*/ + public void check(ParameterTool parameterTool) throws RequiredParameterException { + for (Option o : data.values()) { + // if the parameter is not present or its value is undefined, throw a RuntimeException. + if (!parameterTool.data.containsKey(o.getName()) || keyIsUndefined(o.getName(), parameterTool.data)) { + throw new RequiredParameterException("Required parameter " + o.getName() + " not present."); + } + } + } + + /** +* Check if all parameters defined as required have been supplied. If not use the default values +* which have been supplied. If no default value is supplied for a missing parameter, an exception is thrown. +* +* @param parameterTool - parameters supplied by the user. +*/ + public void checkAndPopulate(ParameterTool parameterTool) throws RequiredParameterException { + for (Option o : data.values()) { + String key = o.getName(); --- End diff -- Check for alternative / short key as well --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-2017) Add predefined required parameters to ParameterTool
[ https://issues.apache.org/jira/browse/FLINK-2017?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14745182#comment-14745182 ] ASF GitHub Bot commented on FLINK-2017: --- Github user fhueske commented on the pull request: https://github.com/apache/flink/pull/1097#issuecomment-140343995 Thanks for the PR @mliesenberg! Can you add the checks (and tests to verify the checks are working) that I mentioned in my comments? The original JIRA issue also included type checks. I think we could restrict those to Java primitives (Integer, Long, Double, Float, Boolean). I would be OK with adding this PR without type checks (and opening another JIRA for that) but it would be a cool feature if you'd like to add that. > Add predefined required parameters to ParameterTool > --- > > Key: FLINK-2017 > URL: https://issues.apache.org/jira/browse/FLINK-2017 > Project: Flink > Issue Type: Improvement >Affects Versions: 0.9 >Reporter: Robert Metzger > Labels: starter > > In FLINK-1525 we've added the {{ParameterTool}}. > During the PR review, there was a request for required parameters. > This issue is about implementing a facility to define required parameters. > The tool should also be able to print a help menu with a list of all > parameters. > This test case shows my initial ideas how to design the API > {code} > @Test > public void requiredParameters() { > RequiredParameters required = new RequiredParameters(); > Option input = required.add("input").alt("i").help("Path to > input file or directory"); // parameter with long and short variant > required.add("output"); // parameter only with long variant > Option parallelism = > required.add("parallelism").alt("p").type(Integer.class); // parameter with > type > Option spOption = > required.add("sourceParallelism").alt("sp").defaultValue(12).help("Number > specifying the number of parallel data source instances"); // parameter with > default value, specifying the type. > Option executionType = > required.add("executionType").alt("et").defaultValue("pipelined").choices("pipelined", > "batch"); > ParameterUtil parameter = ParameterUtil.fromArgs(new > String[]{"-i", "someinput", "--output", "someout", "-p", "15"}); > required.check(parameter); > required.printHelp(); > required.checkAndPopulate(parameter); > String inputString = input.get(); > int par = parallelism.getInteger(); > String output = parameter.get("output"); > int sourcePar = parameter.getInteger(spOption.getName()); > } > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2641) Integrate the off-heap memory configuration with the TaskManager start script
[ https://issues.apache.org/jira/browse/FLINK-2641?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14745184#comment-14745184 ] ASF GitHub Bot commented on FLINK-2641: --- Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/1129#discussion_r39493902 --- Diff: flink-dist/src/main/flink-bin/bin/taskmanager.sh --- @@ -51,13 +51,41 @@ if [[ $STARTSTOP == "start" ]]; then fi fi -if [[ ! ${FLINK_TM_HEAP} =~ ${IS_NUMBER} ]]; then -echo "[ERROR] Configured TaskManager JVM heap size is not a number. Please set '$KEY_TASKM_HEAP_MB' in $FLINK_CONF_FILE." +if [[ ! ${FLINK_TM_HEAP} =~ ${IS_NUMBER} ]] || [[ "${FLINK_TM_HEAP}" -lt "0" ]]; then +echo "[ERROR] Configured TaskManager JVM heap size is not a number. Please set '${KEY_TASKM_MEM_SIZE}' in ${FLINK_CONF_FILE}." exit 1 fi -if [ "$FLINK_TM_HEAP" -gt 0 ]; then -export JVM_ARGS="$JVM_ARGS -Xms"$FLINK_TM_HEAP"m -Xmx"$FLINK_TM_HEAP"m" +if [ "${FLINK_TM_HEAP}" -gt "0" ]; then + +TM_HEAP_SIZE=${FLINK_TM_HEAP} +TM_OFFHEAP_SIZE=0 +# some space for Netty initilization --- End diff -- Thanks > Integrate the off-heap memory configuration with the TaskManager start script > - > > Key: FLINK-2641 > URL: https://issues.apache.org/jira/browse/FLINK-2641 > Project: Flink > Issue Type: New Feature > Components: Start-Stop Scripts >Affects Versions: 0.10 >Reporter: Stephan Ewen >Assignee: Maximilian Michels > Fix For: 0.10 > > > The TaskManager start script needs to adjust the {{-Xmx}}, {{-Xms}}, and > {{-XX:MaxDirectMemorySize}} parameters according to the off-heap memory > settings. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-2641] integrate off-heap memory configu...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/1129#discussion_r39493918 --- Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala --- @@ -1586,32 +1586,29 @@ object TaskManager { ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY, "MemoryManager fraction of the free memory must be between 0.0 and 1.0") - val relativeMemSize = (EnvironmentInformation.getSizeOfFreeHeapMemoryWithDefrag() * -fraction).toLong + if (memType == MemoryType.HEAP) { - LOG.info(s"Using $fraction of the currently free heap space for Flink managed " + -s" heap memory (${relativeMemSize >> 20} MB).") +val relativeMemSize = (EnvironmentInformation.getSizeOfFreeHeapMemoryWithDefrag() * + fraction).toLong - relativeMemSize -} -else { - val ratio = configuration.getFloat( -ConfigConstants.TASK_MANAGER_MEMORY_OFF_HEAP_RATIO_KEY, -ConfigConstants.DEFAULT_MEMORY_MANAGER_MEMORY_OFF_HEAP_RATIO) - - checkConfigParameter(ratio > 0.0f, -ConfigConstants.TASK_MANAGER_MEMORY_OFF_HEAP_RATIO_KEY, -"MemoryManager ratio (off-heap memory / heap size) must be larger than zero") - - val maxHeapSize = EnvironmentInformation.getMaxJvmHeapMemory() - val relativeMemSize = (maxHeapSize * ratio).toLong +LOG.info(s"Using $fraction of the currently free heap space for Flink managed " + + s" heap memory (${relativeMemSize >> 20} MB).") + +relativeMemSize + } else if (memType == MemoryType.OFF_HEAP) { --- End diff -- Thanks. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-2641) Integrate the off-heap memory configuration with the TaskManager start script
[ https://issues.apache.org/jira/browse/FLINK-2641?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14745186#comment-14745186 ] ASF GitHub Bot commented on FLINK-2641: --- Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/1129#discussion_r39493933 --- Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala --- @@ -1586,32 +1586,29 @@ object TaskManager { ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY, "MemoryManager fraction of the free memory must be between 0.0 and 1.0") - val relativeMemSize = (EnvironmentInformation.getSizeOfFreeHeapMemoryWithDefrag() * -fraction).toLong + if (memType == MemoryType.HEAP) { - LOG.info(s"Using $fraction of the currently free heap space for Flink managed " + -s" heap memory (${relativeMemSize >> 20} MB).") +val relativeMemSize = (EnvironmentInformation.getSizeOfFreeHeapMemoryWithDefrag() * + fraction).toLong - relativeMemSize -} -else { - val ratio = configuration.getFloat( -ConfigConstants.TASK_MANAGER_MEMORY_OFF_HEAP_RATIO_KEY, -ConfigConstants.DEFAULT_MEMORY_MANAGER_MEMORY_OFF_HEAP_RATIO) - - checkConfigParameter(ratio > 0.0f, -ConfigConstants.TASK_MANAGER_MEMORY_OFF_HEAP_RATIO_KEY, -"MemoryManager ratio (off-heap memory / heap size) must be larger than zero") - - val maxHeapSize = EnvironmentInformation.getMaxJvmHeapMemory() - val relativeMemSize = (maxHeapSize * ratio).toLong +LOG.info(s"Using $fraction of the currently free heap space for Flink managed " + + s" heap memory (${relativeMemSize >> 20} MB).") + +relativeMemSize + } else if (memType == MemoryType.OFF_HEAP) { - LOG.info(s"Using $ratio time the heap size (${maxHeapSize} bytes) for Flink " + -s"managed off-heap memory (${relativeMemSize >> 20} MB).") +// The maximum heap memory has been adjusted according to the fraction +val directMemorySize = (EnvironmentInformation.getMaxJvmHeapMemory() / fraction).toLong - relativeMemSize +LOG.info(s"Using $fraction of the maximum memory size for " + + s"Flink managed off-heap memory (${directMemorySize >> 20} MB).") + +directMemorySize + } else { --- End diff -- Thanks > Integrate the off-heap memory configuration with the TaskManager start script > - > > Key: FLINK-2641 > URL: https://issues.apache.org/jira/browse/FLINK-2641 > Project: Flink > Issue Type: New Feature > Components: Start-Stop Scripts >Affects Versions: 0.10 >Reporter: Stephan Ewen >Assignee: Maximilian Michels > Fix For: 0.10 > > > The TaskManager start script needs to adjust the {{-Xmx}}, {{-Xms}}, and > {{-XX:MaxDirectMemorySize}} parameters according to the off-heap memory > settings. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-2641] integrate off-heap memory configu...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/1129#discussion_r39493898 --- Diff: flink-dist/src/main/flink-bin/bin/taskmanager.sh --- @@ -51,13 +51,41 @@ if [[ $STARTSTOP == "start" ]]; then fi fi -if [[ ! ${FLINK_TM_HEAP} =~ ${IS_NUMBER} ]]; then -echo "[ERROR] Configured TaskManager JVM heap size is not a number. Please set '$KEY_TASKM_HEAP_MB' in $FLINK_CONF_FILE." +if [[ ! ${FLINK_TM_HEAP} =~ ${IS_NUMBER} ]] || [[ "${FLINK_TM_HEAP}" -lt "0" ]]; then +echo "[ERROR] Configured TaskManager JVM heap size is not a number. Please set '${KEY_TASKM_MEM_SIZE}' in ${FLINK_CONF_FILE}." exit 1 fi -if [ "$FLINK_TM_HEAP" -gt 0 ]; then -export JVM_ARGS="$JVM_ARGS -Xms"$FLINK_TM_HEAP"m -Xmx"$FLINK_TM_HEAP"m" +if [ "${FLINK_TM_HEAP}" -gt "0" ]; then + +TM_HEAP_SIZE=${FLINK_TM_HEAP} +TM_OFFHEAP_SIZE=0 +# some space for Netty initilization +NETTY_BUFFERS=$((1024 * 1024)) --- End diff -- Yes, actually it needs much less. This is just some static initialization code Netty runs. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2641] integrate off-heap memory configu...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/1129#discussion_r39493879 --- Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala --- @@ -1586,32 +1586,29 @@ object TaskManager { ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY, "MemoryManager fraction of the free memory must be between 0.0 and 1.0") - val relativeMemSize = (EnvironmentInformation.getSizeOfFreeHeapMemoryWithDefrag() * -fraction).toLong + if (memType == MemoryType.HEAP) { - LOG.info(s"Using $fraction of the currently free heap space for Flink managed " + -s" heap memory (${relativeMemSize >> 20} MB).") +val relativeMemSize = (EnvironmentInformation.getSizeOfFreeHeapMemoryWithDefrag() * + fraction).toLong - relativeMemSize -} -else { - val ratio = configuration.getFloat( -ConfigConstants.TASK_MANAGER_MEMORY_OFF_HEAP_RATIO_KEY, -ConfigConstants.DEFAULT_MEMORY_MANAGER_MEMORY_OFF_HEAP_RATIO) - - checkConfigParameter(ratio > 0.0f, -ConfigConstants.TASK_MANAGER_MEMORY_OFF_HEAP_RATIO_KEY, -"MemoryManager ratio (off-heap memory / heap size) must be larger than zero") - - val maxHeapSize = EnvironmentInformation.getMaxJvmHeapMemory() - val relativeMemSize = (maxHeapSize * ratio).toLong +LOG.info(s"Using $fraction of the currently free heap space for Flink managed " + + s" heap memory (${relativeMemSize >> 20} MB).") + +relativeMemSize + } else if (memType == MemoryType.OFF_HEAP) { - LOG.info(s"Using $ratio time the heap size (${maxHeapSize} bytes) for Flink " + -s"managed off-heap memory (${relativeMemSize >> 20} MB).") +// The maximum heap memory has been adjusted according to the fraction +val directMemorySize = (EnvironmentInformation.getMaxJvmHeapMemory() / fraction).toLong --- End diff -- Yes, actually it is (HEAPSIZE / fraction) * (1.0 - fraction) :) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2017] Add predefined required parameter...
Github user fhueske commented on the pull request: https://github.com/apache/flink/pull/1097#issuecomment-140343995 Thanks for the PR @mliesenberg! Can you add the checks (and tests to verify the checks are working) that I mentioned in my comments? The original JIRA issue also included type checks. I think we could restrict those to Java primitives (Integer, Long, Double, Float, Boolean). I would be OK with adding this PR without type checks (and opening another JIRA for that) but it would be a cool feature if you'd like to add that. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-2641) Integrate the off-heap memory configuration with the TaskManager start script
[ https://issues.apache.org/jira/browse/FLINK-2641?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14745181#comment-14745181 ] ASF GitHub Bot commented on FLINK-2641: --- Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/1129#discussion_r39493894 --- Diff: flink-dist/src/main/flink-bin/bin/taskmanager.sh --- @@ -51,13 +51,41 @@ if [[ $STARTSTOP == "start" ]]; then fi fi -if [[ ! ${FLINK_TM_HEAP} =~ ${IS_NUMBER} ]]; then -echo "[ERROR] Configured TaskManager JVM heap size is not a number. Please set '$KEY_TASKM_HEAP_MB' in $FLINK_CONF_FILE." +if [[ ! ${FLINK_TM_HEAP} =~ ${IS_NUMBER} ]] || [[ "${FLINK_TM_HEAP}" -lt "0" ]]; then +echo "[ERROR] Configured TaskManager JVM heap size is not a number. Please set '${KEY_TASKM_MEM_SIZE}' in ${FLINK_CONF_FILE}." exit 1 fi -if [ "$FLINK_TM_HEAP" -gt 0 ]; then -export JVM_ARGS="$JVM_ARGS -Xms"$FLINK_TM_HEAP"m -Xmx"$FLINK_TM_HEAP"m" +if [ "${FLINK_TM_HEAP}" -gt "0" ]; then + +TM_HEAP_SIZE=${FLINK_TM_HEAP} +TM_OFFHEAP_SIZE=0 +# some space for Netty initilization +NETTY_BUFFERS=$((1024 * 1024)) + +if useOffHeapMemory; then +if [[ "${FLINK_TM_MEM_MANAGED_SIZE}" -gt "0" ]]; then +# We split up the total memory in heap and off-heap memory +if [[ "${FLINK_TM_HEAP}" -le "${FLINK_TM_MEM_MANAGED_SIZE}" ]]; then +echo "[ERROR] Configured TaskManager memory size ('${KEY_TASKM_MEM_SIZE}') must be larger than the managed memory size ('${KEY_TASKM_MEM_MANAGED_SIZE}')." +exit 1 +fi +TM_OFFHEAP_SIZE=${FLINK_TM_MEM_MANAGED_SIZE} +TM_HEAP_SIZE=$((FLINK_TM_HEAP - FLINK_TM_MEM_MANAGED_SIZE)) +else +# We calculate the memory using a fraction of the total memory +if [[ `bc -l <<< "${FLINK_TM_MEM_MANAGED_FRACTION} >= 1.0"` != "0" ]]; then +echo "[ERROR] Configured TaskManager managed memory fraction is not a valid value. Please set '${KEY_TASKM_MEM_MANAGED_FRACTION}' in ${FLINK_CONF_FILE}" --- End diff -- Thanks, added a lower bound. > Integrate the off-heap memory configuration with the TaskManager start script > - > > Key: FLINK-2641 > URL: https://issues.apache.org/jira/browse/FLINK-2641 > Project: Flink > Issue Type: New Feature > Components: Start-Stop Scripts >Affects Versions: 0.10 >Reporter: Stephan Ewen >Assignee: Maximilian Michels > Fix For: 0.10 > > > The TaskManager start script needs to adjust the {{-Xmx}}, {{-Xms}}, and > {{-XX:MaxDirectMemorySize}} parameters according to the off-heap memory > settings. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-2641] integrate off-heap memory configu...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/1129#discussion_r39493933 --- Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala --- @@ -1586,32 +1586,29 @@ object TaskManager { ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY, "MemoryManager fraction of the free memory must be between 0.0 and 1.0") - val relativeMemSize = (EnvironmentInformation.getSizeOfFreeHeapMemoryWithDefrag() * -fraction).toLong + if (memType == MemoryType.HEAP) { - LOG.info(s"Using $fraction of the currently free heap space for Flink managed " + -s" heap memory (${relativeMemSize >> 20} MB).") +val relativeMemSize = (EnvironmentInformation.getSizeOfFreeHeapMemoryWithDefrag() * + fraction).toLong - relativeMemSize -} -else { - val ratio = configuration.getFloat( -ConfigConstants.TASK_MANAGER_MEMORY_OFF_HEAP_RATIO_KEY, -ConfigConstants.DEFAULT_MEMORY_MANAGER_MEMORY_OFF_HEAP_RATIO) - - checkConfigParameter(ratio > 0.0f, -ConfigConstants.TASK_MANAGER_MEMORY_OFF_HEAP_RATIO_KEY, -"MemoryManager ratio (off-heap memory / heap size) must be larger than zero") - - val maxHeapSize = EnvironmentInformation.getMaxJvmHeapMemory() - val relativeMemSize = (maxHeapSize * ratio).toLong +LOG.info(s"Using $fraction of the currently free heap space for Flink managed " + + s" heap memory (${relativeMemSize >> 20} MB).") + +relativeMemSize + } else if (memType == MemoryType.OFF_HEAP) { - LOG.info(s"Using $ratio time the heap size (${maxHeapSize} bytes) for Flink " + -s"managed off-heap memory (${relativeMemSize >> 20} MB).") +// The maximum heap memory has been adjusted according to the fraction +val directMemorySize = (EnvironmentInformation.getMaxJvmHeapMemory() / fraction).toLong - relativeMemSize +LOG.info(s"Using $fraction of the maximum memory size for " + + s"Flink managed off-heap memory (${directMemorySize >> 20} MB).") + +directMemorySize + } else { --- End diff -- Thanks --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-2641) Integrate the off-heap memory configuration with the TaskManager start script
[ https://issues.apache.org/jira/browse/FLINK-2641?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14745183#comment-14745183 ] ASF GitHub Bot commented on FLINK-2641: --- Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/1129#discussion_r39493898 --- Diff: flink-dist/src/main/flink-bin/bin/taskmanager.sh --- @@ -51,13 +51,41 @@ if [[ $STARTSTOP == "start" ]]; then fi fi -if [[ ! ${FLINK_TM_HEAP} =~ ${IS_NUMBER} ]]; then -echo "[ERROR] Configured TaskManager JVM heap size is not a number. Please set '$KEY_TASKM_HEAP_MB' in $FLINK_CONF_FILE." +if [[ ! ${FLINK_TM_HEAP} =~ ${IS_NUMBER} ]] || [[ "${FLINK_TM_HEAP}" -lt "0" ]]; then +echo "[ERROR] Configured TaskManager JVM heap size is not a number. Please set '${KEY_TASKM_MEM_SIZE}' in ${FLINK_CONF_FILE}." exit 1 fi -if [ "$FLINK_TM_HEAP" -gt 0 ]; then -export JVM_ARGS="$JVM_ARGS -Xms"$FLINK_TM_HEAP"m -Xmx"$FLINK_TM_HEAP"m" +if [ "${FLINK_TM_HEAP}" -gt "0" ]; then + +TM_HEAP_SIZE=${FLINK_TM_HEAP} +TM_OFFHEAP_SIZE=0 +# some space for Netty initilization +NETTY_BUFFERS=$((1024 * 1024)) --- End diff -- Yes, actually it needs much less. This is just some static initialization code Netty runs. > Integrate the off-heap memory configuration with the TaskManager start script > - > > Key: FLINK-2641 > URL: https://issues.apache.org/jira/browse/FLINK-2641 > Project: Flink > Issue Type: New Feature > Components: Start-Stop Scripts >Affects Versions: 0.10 >Reporter: Stephan Ewen >Assignee: Maximilian Michels > Fix For: 0.10 > > > The TaskManager start script needs to adjust the {{-Xmx}}, {{-Xms}}, and > {{-XX:MaxDirectMemorySize}} parameters according to the off-heap memory > settings. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2641) Integrate the off-heap memory configuration with the TaskManager start script
[ https://issues.apache.org/jira/browse/FLINK-2641?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14745185#comment-14745185 ] ASF GitHub Bot commented on FLINK-2641: --- Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/1129#discussion_r39493918 --- Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala --- @@ -1586,32 +1586,29 @@ object TaskManager { ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY, "MemoryManager fraction of the free memory must be between 0.0 and 1.0") - val relativeMemSize = (EnvironmentInformation.getSizeOfFreeHeapMemoryWithDefrag() * -fraction).toLong + if (memType == MemoryType.HEAP) { - LOG.info(s"Using $fraction of the currently free heap space for Flink managed " + -s" heap memory (${relativeMemSize >> 20} MB).") +val relativeMemSize = (EnvironmentInformation.getSizeOfFreeHeapMemoryWithDefrag() * + fraction).toLong - relativeMemSize -} -else { - val ratio = configuration.getFloat( -ConfigConstants.TASK_MANAGER_MEMORY_OFF_HEAP_RATIO_KEY, -ConfigConstants.DEFAULT_MEMORY_MANAGER_MEMORY_OFF_HEAP_RATIO) - - checkConfigParameter(ratio > 0.0f, -ConfigConstants.TASK_MANAGER_MEMORY_OFF_HEAP_RATIO_KEY, -"MemoryManager ratio (off-heap memory / heap size) must be larger than zero") - - val maxHeapSize = EnvironmentInformation.getMaxJvmHeapMemory() - val relativeMemSize = (maxHeapSize * ratio).toLong +LOG.info(s"Using $fraction of the currently free heap space for Flink managed " + + s" heap memory (${relativeMemSize >> 20} MB).") + +relativeMemSize + } else if (memType == MemoryType.OFF_HEAP) { --- End diff -- Thanks. > Integrate the off-heap memory configuration with the TaskManager start script > - > > Key: FLINK-2641 > URL: https://issues.apache.org/jira/browse/FLINK-2641 > Project: Flink > Issue Type: New Feature > Components: Start-Stop Scripts >Affects Versions: 0.10 >Reporter: Stephan Ewen >Assignee: Maximilian Michels > Fix For: 0.10 > > > The TaskManager start script needs to adjust the {{-Xmx}}, {{-Xms}}, and > {{-XX:MaxDirectMemorySize}} parameters according to the off-heap memory > settings. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2641) Integrate the off-heap memory configuration with the TaskManager start script
[ https://issues.apache.org/jira/browse/FLINK-2641?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14745180#comment-14745180 ] ASF GitHub Bot commented on FLINK-2641: --- Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/1129#discussion_r39493879 --- Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala --- @@ -1586,32 +1586,29 @@ object TaskManager { ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY, "MemoryManager fraction of the free memory must be between 0.0 and 1.0") - val relativeMemSize = (EnvironmentInformation.getSizeOfFreeHeapMemoryWithDefrag() * -fraction).toLong + if (memType == MemoryType.HEAP) { - LOG.info(s"Using $fraction of the currently free heap space for Flink managed " + -s" heap memory (${relativeMemSize >> 20} MB).") +val relativeMemSize = (EnvironmentInformation.getSizeOfFreeHeapMemoryWithDefrag() * + fraction).toLong - relativeMemSize -} -else { - val ratio = configuration.getFloat( -ConfigConstants.TASK_MANAGER_MEMORY_OFF_HEAP_RATIO_KEY, -ConfigConstants.DEFAULT_MEMORY_MANAGER_MEMORY_OFF_HEAP_RATIO) - - checkConfigParameter(ratio > 0.0f, -ConfigConstants.TASK_MANAGER_MEMORY_OFF_HEAP_RATIO_KEY, -"MemoryManager ratio (off-heap memory / heap size) must be larger than zero") - - val maxHeapSize = EnvironmentInformation.getMaxJvmHeapMemory() - val relativeMemSize = (maxHeapSize * ratio).toLong +LOG.info(s"Using $fraction of the currently free heap space for Flink managed " + + s" heap memory (${relativeMemSize >> 20} MB).") + +relativeMemSize + } else if (memType == MemoryType.OFF_HEAP) { - LOG.info(s"Using $ratio time the heap size (${maxHeapSize} bytes) for Flink " + -s"managed off-heap memory (${relativeMemSize >> 20} MB).") +// The maximum heap memory has been adjusted according to the fraction +val directMemorySize = (EnvironmentInformation.getMaxJvmHeapMemory() / fraction).toLong --- End diff -- Yes, actually it is (HEAPSIZE / fraction) * (1.0 - fraction) :) > Integrate the off-heap memory configuration with the TaskManager start script > - > > Key: FLINK-2641 > URL: https://issues.apache.org/jira/browse/FLINK-2641 > Project: Flink > Issue Type: New Feature > Components: Start-Stop Scripts >Affects Versions: 0.10 >Reporter: Stephan Ewen >Assignee: Maximilian Michels > Fix For: 0.10 > > > The TaskManager start script needs to adjust the {{-Xmx}}, {{-Xms}}, and > {{-XX:MaxDirectMemorySize}} parameters according to the off-heap memory > settings. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1745) Add exact k-nearest-neighbours algorithm to machine learning library
[ https://issues.apache.org/jira/browse/FLINK-1745?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14745054#comment-14745054 ] Chiwan Park commented on FLINK-1745: Hi [~danielblazevski], you can modify my implementation by pulling my issue branch (https://github.com/chiwanpark/flink/tree/FLINK-1745). After implemented, please open pull request for reviewing. Then I'll close my PR and review your PR. If you have any question about my implementation, post the question to this thread. :-) > Add exact k-nearest-neighbours algorithm to machine learning library > > > Key: FLINK-1745 > URL: https://issues.apache.org/jira/browse/FLINK-1745 > Project: Flink > Issue Type: New Feature > Components: Machine Learning Library >Reporter: Till Rohrmann > Labels: ML, Starter > > Even though the k-nearest-neighbours (kNN) [1,2] algorithm is quite trivial > it is still used as a mean to classify data and to do regression. This issue > focuses on the implementation of an exact kNN (H-BNLJ, H-BRJ) algorithm as > proposed in [2]. > Could be a starter task. > Resources: > [1] [http://en.wikipedia.org/wiki/K-nearest_neighbors_algorithm] > [2] [https://www.cs.utah.edu/~lifeifei/papers/mrknnj.pdf] -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-2641] integrate off-heap memory configu...
Github user fhueske commented on the pull request: https://github.com/apache/flink/pull/1129#issuecomment-140336732 Thanks for the PR @mxm. I left a few comments inline. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-2641) Integrate the off-heap memory configuration with the TaskManager start script
[ https://issues.apache.org/jira/browse/FLINK-2641?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14745132#comment-14745132 ] ASF GitHub Bot commented on FLINK-2641: --- Github user fhueske commented on the pull request: https://github.com/apache/flink/pull/1129#issuecomment-140336732 Thanks for the PR @mxm. I left a few comments inline. > Integrate the off-heap memory configuration with the TaskManager start script > - > > Key: FLINK-2641 > URL: https://issues.apache.org/jira/browse/FLINK-2641 > Project: Flink > Issue Type: New Feature > Components: Start-Stop Scripts >Affects Versions: 0.10 >Reporter: Stephan Ewen >Assignee: Maximilian Michels > Fix For: 0.10 > > > The TaskManager start script needs to adjust the {{-Xmx}}, {{-Xms}}, and > {{-XX:MaxDirectMemorySize}} parameters according to the off-heap memory > settings. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2017) Add predefined required parameters to ParameterTool
[ https://issues.apache.org/jira/browse/FLINK-2017?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14745161#comment-14745161 ] ASF GitHub Bot commented on FLINK-2017: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1097#discussion_r39492448 --- Diff: flink-java/src/main/java/org/apache/flink/api/java/utils/Option.java --- @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.utils; + +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; + +/** + * Internal representation of a parameter passed to a user defined function. + */ +public class Option { + + private String longName; + private String shortName; + + private T defaultValue; + private Set choices; + + private String helpText; + + public Option(String name) { + this.longName = name; + this.choices = new HashSet<>(); + } + + /** +* Define a alternative / short name of the Parameter. +* +* @param shortName - short version of the parameter name +* @return the updated Option +*/ + public Option alt(String shortName) { + this.shortName = shortName; + return this; + } + + /** +* Define a default value for the option. +* +* Throws an exception if the list of possible values for the parameter is not empty and the default value passed +* is not in the list. +* +* @param defaultValue - the default value +* @return the updated Option +*/ + public Option defaultValue(T defaultValue) { + if (this.choices.isEmpty()) { + return this.setDefaultValue(defaultValue); + } else { + if (this.choices.contains(defaultValue)) { + return this.setDefaultValue(defaultValue); + } else { + throw new IllegalArgumentException("defaultValue passed is not in the list of expected values."); + } + } + } + + /** +* Restrict the list of possible values of the parameter. +* +* @param choices - the allowed values of the parameter. +* @return the updated Option +*/ + public Option choices(T... choices) { + Collections.addAll(this.choices, choices); --- End diff -- Add check that `defaultValue` (if already set) is in choices. > Add predefined required parameters to ParameterTool > --- > > Key: FLINK-2017 > URL: https://issues.apache.org/jira/browse/FLINK-2017 > Project: Flink > Issue Type: Improvement >Affects Versions: 0.9 >Reporter: Robert Metzger > Labels: starter > > In FLINK-1525 we've added the {{ParameterTool}}. > During the PR review, there was a request for required parameters. > This issue is about implementing a facility to define required parameters. > The tool should also be able to print a help menu with a list of all > parameters. > This test case shows my initial ideas how to design the API > {code} > @Test > public void requiredParameters() { > RequiredParameters required = new RequiredParameters(); > Option input = required.add("input").alt("i").help("Path to > input file or directory"); // parameter with long and short variant > required.add("output"); // parameter only with long variant > Option parallelism = > required.add("parallelism").alt("p").type(Integer.class); // parameter with > type > Option spOption = > required.add("sourceParallelism").alt("sp").defaultValue(12).help("Number > specifying the number of parallel data source instances");
[GitHub] flink pull request: [FLINK-2017] Add predefined required parameter...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1097#discussion_r39492448 --- Diff: flink-java/src/main/java/org/apache/flink/api/java/utils/Option.java --- @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.utils; + +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; + +/** + * Internal representation of a parameter passed to a user defined function. + */ +public class Option { + + private String longName; + private String shortName; + + private T defaultValue; + private Set choices; + + private String helpText; + + public Option(String name) { + this.longName = name; + this.choices = new HashSet<>(); + } + + /** +* Define a alternative / short name of the Parameter. +* +* @param shortName - short version of the parameter name +* @return the updated Option +*/ + public Option alt(String shortName) { + this.shortName = shortName; + return this; + } + + /** +* Define a default value for the option. +* +* Throws an exception if the list of possible values for the parameter is not empty and the default value passed +* is not in the list. +* +* @param defaultValue - the default value +* @return the updated Option +*/ + public Option defaultValue(T defaultValue) { + if (this.choices.isEmpty()) { + return this.setDefaultValue(defaultValue); + } else { + if (this.choices.contains(defaultValue)) { + return this.setDefaultValue(defaultValue); + } else { + throw new IllegalArgumentException("defaultValue passed is not in the list of expected values."); + } + } + } + + /** +* Restrict the list of possible values of the parameter. +* +* @param choices - the allowed values of the parameter. +* @return the updated Option +*/ + public Option choices(T... choices) { + Collections.addAll(this.choices, choices); --- End diff -- Add check that `defaultValue` (if already set) is in choices. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-2675) Add utilities for scheduled triggers
[ https://issues.apache.org/jira/browse/FLINK-2675?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14745592#comment-14745592 ] ASF GitHub Bot commented on FLINK-2675: --- GitHub user StephanEwen opened a pull request: https://github.com/apache/flink/pull/1133 [FLINK-2675] [streaming] Add utilities for scheduled triggers. These utilities are used by processing time triggers to schedule evaluations for the future. They are the first part of reworking the streaming windows to make them robust and faster. You can merge this pull request into a Git repository by running: $ git pull https://github.com/StephanEwen/incubator-flink triggers Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/1133.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1133 commit f57d0d68a691a518482935fed9290efad6f30dbd Author: Stephan EwenDate: 2015-09-15T12:55:41Z [FLINK-2675] [streaming] Add utilities for scheduled triggers. > Add utilities for scheduled triggers > > > Key: FLINK-2675 > URL: https://issues.apache.org/jira/browse/FLINK-2675 > Project: Flink > Issue Type: Sub-task > Components: Streaming >Affects Versions: 0.10 >Reporter: Stephan Ewen >Assignee: Stephan Ewen > Fix For: 0.10 > > > These utilities help schedule triggers for the future, ensure non-concurrent > trigger execution, and proper trigger shutdown and release. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-2675] [streaming] Add utilities for sch...
GitHub user StephanEwen opened a pull request: https://github.com/apache/flink/pull/1133 [FLINK-2675] [streaming] Add utilities for scheduled triggers. These utilities are used by processing time triggers to schedule evaluations for the future. They are the first part of reworking the streaming windows to make them robust and faster. You can merge this pull request into a Git repository by running: $ git pull https://github.com/StephanEwen/incubator-flink triggers Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/1133.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1133 commit f57d0d68a691a518482935fed9290efad6f30dbd Author: Stephan EwenDate: 2015-09-15T12:55:41Z [FLINK-2675] [streaming] Add utilities for scheduled triggers. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-2675) Add utilities for scheduled triggers
[ https://issues.apache.org/jira/browse/FLINK-2675?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14745616#comment-14745616 ] ASF GitHub Bot commented on FLINK-2675: --- Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/1133#discussion_r39526103 --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/TriggerTimer.java --- @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.operators; + +import org.apache.flink.runtime.taskmanager.DispatcherThreadFactory; + +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +/** + * A timer that triggers targets at a specific point in the future. This timer executes single-threaded, + * which means that never more than one trigger will be executed at the same time. + * + * This timer generally maintains order of trigger events. This means that for two triggers scheduled at + * different times, the one scheduled for the later time will be executed after the one scheduled for the + * earlier time. + */ +public class TriggerTimer { + + /** The thread group that holds all trigger timer threads */ + public static final ThreadGroup TRIGGER_THREADS_GROUP = new ThreadGroup("Triggers"); + + /** The executor service that */ --- End diff -- ah, right ;-) > Add utilities for scheduled triggers > > > Key: FLINK-2675 > URL: https://issues.apache.org/jira/browse/FLINK-2675 > Project: Flink > Issue Type: Sub-task > Components: Streaming >Affects Versions: 0.10 >Reporter: Stephan Ewen >Assignee: Stephan Ewen > Fix For: 0.10 > > > These utilities help schedule triggers for the future, ensure non-concurrent > trigger execution, and proper trigger shutdown and release. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [hotfix][Table API tests]add toDataSet in tabl...
Github user twalthr commented on the pull request: https://github.com/apache/flink/pull/1131#issuecomment-140420760 +1 from me too. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2675] [streaming] Add utilities for sch...
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/1133#discussion_r39526103 --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/TriggerTimer.java --- @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.operators; + +import org.apache.flink.runtime.taskmanager.DispatcherThreadFactory; + +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +/** + * A timer that triggers targets at a specific point in the future. This timer executes single-threaded, + * which means that never more than one trigger will be executed at the same time. + * + * This timer generally maintains order of trigger events. This means that for two triggers scheduled at + * different times, the one scheduled for the later time will be executed after the one scheduled for the + * earlier time. + */ +public class TriggerTimer { + + /** The thread group that holds all trigger timer threads */ + public static final ThreadGroup TRIGGER_THREADS_GROUP = new ThreadGroup("Triggers"); + + /** The executor service that */ --- End diff -- ah, right ;-) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2675] [streaming] Add utilities for sch...
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/1133#discussion_r39525474 --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/TriggerTimer.java --- @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.operators; + +import org.apache.flink.runtime.taskmanager.DispatcherThreadFactory; + +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +/** + * A timer that triggers targets at a specific point in the future. This timer executes single-threaded, + * which means that never more than one trigger will be executed at the same time. + * + * This timer generally maintains order of trigger events. This means that for two triggers scheduled at + * different times, the one scheduled for the later time will be executed after the one scheduled for the + * earlier time. + */ +public class TriggerTimer { + + /** The thread group that holds all trigger timer threads */ + public static final ThreadGroup TRIGGER_THREADS_GROUP = new ThreadGroup("Triggers"); + + /** The executor service that */ --- End diff -- some text missing --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-2675) Add utilities for scheduled triggers
[ https://issues.apache.org/jira/browse/FLINK-2675?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14745609#comment-14745609 ] ASF GitHub Bot commented on FLINK-2675: --- Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/1133#discussion_r39525474 --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/TriggerTimer.java --- @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.operators; + +import org.apache.flink.runtime.taskmanager.DispatcherThreadFactory; + +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +/** + * A timer that triggers targets at a specific point in the future. This timer executes single-threaded, + * which means that never more than one trigger will be executed at the same time. + * + * This timer generally maintains order of trigger events. This means that for two triggers scheduled at + * different times, the one scheduled for the later time will be executed after the one scheduled for the + * earlier time. + */ +public class TriggerTimer { + + /** The thread group that holds all trigger timer threads */ + public static final ThreadGroup TRIGGER_THREADS_GROUP = new ThreadGroup("Triggers"); + + /** The executor service that */ --- End diff -- some text missing > Add utilities for scheduled triggers > > > Key: FLINK-2675 > URL: https://issues.apache.org/jira/browse/FLINK-2675 > Project: Flink > Issue Type: Sub-task > Components: Streaming >Affects Versions: 0.10 >Reporter: Stephan Ewen >Assignee: Stephan Ewen > Fix For: 0.10 > > > These utilities help schedule triggers for the future, ensure non-concurrent > trigger execution, and proper trigger shutdown and release. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-2017] Add predefined required parameter...
Github user fhueske commented on the pull request: https://github.com/apache/flink/pull/1097#issuecomment-140347951 If you add a type field (or enum) to the option, you can check if you can cast the string into the requested type. Since we would only support a fixed set of types (the primitives I listed above) this should be quite easy. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-2017) Add predefined required parameters to ParameterTool
[ https://issues.apache.org/jira/browse/FLINK-2017?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14745212#comment-14745212 ] ASF GitHub Bot commented on FLINK-2017: --- Github user fhueske commented on the pull request: https://github.com/apache/flink/pull/1097#issuecomment-140347951 If you add a type field (or enum) to the option, you can check if you can cast the string into the requested type. Since we would only support a fixed set of types (the primitives I listed above) this should be quite easy. > Add predefined required parameters to ParameterTool > --- > > Key: FLINK-2017 > URL: https://issues.apache.org/jira/browse/FLINK-2017 > Project: Flink > Issue Type: Improvement >Affects Versions: 0.9 >Reporter: Robert Metzger > Labels: starter > > In FLINK-1525 we've added the {{ParameterTool}}. > During the PR review, there was a request for required parameters. > This issue is about implementing a facility to define required parameters. > The tool should also be able to print a help menu with a list of all > parameters. > This test case shows my initial ideas how to design the API > {code} > @Test > public void requiredParameters() { > RequiredParameters required = new RequiredParameters(); > Option input = required.add("input").alt("i").help("Path to > input file or directory"); // parameter with long and short variant > required.add("output"); // parameter only with long variant > Option parallelism = > required.add("parallelism").alt("p").type(Integer.class); // parameter with > type > Option spOption = > required.add("sourceParallelism").alt("sp").defaultValue(12).help("Number > specifying the number of parallel data source instances"); // parameter with > default value, specifying the type. > Option executionType = > required.add("executionType").alt("et").defaultValue("pipelined").choices("pipelined", > "batch"); > ParameterUtil parameter = ParameterUtil.fromArgs(new > String[]{"-i", "someinput", "--output", "someout", "-p", "15"}); > required.check(parameter); > required.printHelp(); > required.checkAndPopulate(parameter); > String inputString = input.get(); > int par = parallelism.getInteger(); > String output = parameter.get("output"); > int sourcePar = parameter.getInteger(spOption.getName()); > } > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2641) Integrate the off-heap memory configuration with the TaskManager start script
[ https://issues.apache.org/jira/browse/FLINK-2641?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14745227#comment-14745227 ] ASF GitHub Bot commented on FLINK-2641: --- Github user fhueske commented on the pull request: https://github.com/apache/flink/pull/1129#issuecomment-140349703 Thanks for the update. Good to merge, IMO > Integrate the off-heap memory configuration with the TaskManager start script > - > > Key: FLINK-2641 > URL: https://issues.apache.org/jira/browse/FLINK-2641 > Project: Flink > Issue Type: New Feature > Components: Start-Stop Scripts >Affects Versions: 0.10 >Reporter: Stephan Ewen >Assignee: Maximilian Michels > Fix For: 0.10 > > > The TaskManager start script needs to adjust the {{-Xmx}}, {{-Xms}}, and > {{-XX:MaxDirectMemorySize}} parameters according to the off-heap memory > settings. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-2641] integrate off-heap memory configu...
Github user fhueske commented on the pull request: https://github.com/apache/flink/pull/1129#issuecomment-140349703 Thanks for the update. Good to merge, IMO --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-2017) Add predefined required parameters to ParameterTool
[ https://issues.apache.org/jira/browse/FLINK-2017?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14745245#comment-14745245 ] ASF GitHub Bot commented on FLINK-2017: --- Github user mliesenberg commented on a diff in the pull request: https://github.com/apache/flink/pull/1097#discussion_r39497481 --- Diff: flink-java/src/main/java/org/apache/flink/api/java/utils/RequiredParameter.java --- @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.utils; + +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; + +/** + * Facility to manage required parameters in user defined functions. + */ +public class RequiredParameter { + + private static final String HELP_TEXT_PARAM_DELIMITER = "\t"; + private static final String HELP_TEXT_LINE_DELIMITER = "\n"; + + private HashMapdata; + + public RequiredParameter() { + this.data = new HashMap<>(); + } + + public void add(Option option) { + this.data.put(option.getName(), option); + } + + /** +* Check if all parameters defined as required have been supplied. +* +* @param parameterTool - parameters supplied by user. +*/ + public void check(ParameterTool parameterTool) throws RequiredParameterException { + for (Option o : data.values()) { + // if the parameter is not present or its value is undefined, throw a RuntimeException. + if (!parameterTool.data.containsKey(o.getName()) || keyIsUndefined(o.getName(), parameterTool.data)) { --- End diff -- Sounds good to me. > Add predefined required parameters to ParameterTool > --- > > Key: FLINK-2017 > URL: https://issues.apache.org/jira/browse/FLINK-2017 > Project: Flink > Issue Type: Improvement >Affects Versions: 0.9 >Reporter: Robert Metzger > Labels: starter > > In FLINK-1525 we've added the {{ParameterTool}}. > During the PR review, there was a request for required parameters. > This issue is about implementing a facility to define required parameters. > The tool should also be able to print a help menu with a list of all > parameters. > This test case shows my initial ideas how to design the API > {code} > @Test > public void requiredParameters() { > RequiredParameters required = new RequiredParameters(); > Option input = required.add("input").alt("i").help("Path to > input file or directory"); // parameter with long and short variant > required.add("output"); // parameter only with long variant > Option parallelism = > required.add("parallelism").alt("p").type(Integer.class); // parameter with > type > Option spOption = > required.add("sourceParallelism").alt("sp").defaultValue(12).help("Number > specifying the number of parallel data source instances"); // parameter with > default value, specifying the type. > Option executionType = > required.add("executionType").alt("et").defaultValue("pipelined").choices("pipelined", > "batch"); > ParameterUtil parameter = ParameterUtil.fromArgs(new > String[]{"-i", "someinput", "--output", "someout", "-p", "15"}); > required.check(parameter); > required.printHelp(); > required.checkAndPopulate(parameter); > String inputString = input.get(); > int par = parallelism.getInteger(); > String output = parameter.get("output"); > int sourcePar = parameter.getInteger(spOption.getName()); > } > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2167) Add fromHCat() to TableEnvironment
[ https://issues.apache.org/jira/browse/FLINK-2167?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14745260#comment-14745260 ] ASF GitHub Bot commented on FLINK-2167: --- Github user twalthr commented on the pull request: https://github.com/apache/flink/pull/1127#issuecomment-140355813 Hi @chiwanpark, yes I think splitting it up makes sense. I just opened this PR to get some feedback and to show why my changes are necessary to integrate new input formats like HCatalog. You can ignore the `HCatTableSource` class as it is untested yet anyway. > Add fromHCat() to TableEnvironment > -- > > Key: FLINK-2167 > URL: https://issues.apache.org/jira/browse/FLINK-2167 > Project: Flink > Issue Type: New Feature > Components: Table API >Affects Versions: 0.9 >Reporter: Fabian Hueske >Assignee: Timo Walther >Priority: Minor > Labels: starter > > Add a {{fromHCat()}} method to the {{TableEnvironment}} to read a {{Table}} > from an HCatalog table. > The implementation could reuse Flink's HCatInputFormat. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK1919] add HCatOutputFormat
Github user jamescao commented on the pull request: https://github.com/apache/flink/pull/1079#issuecomment-140359144 @chiwanpark pr is now updated, I pull out code related to HCatInputFormat and incorporated Flink-2555 and Flink-2617. I also change the test environment to from `CollectionsEnviroment` to `LocalEnviroment` to cover the test of the de-serialization for the HCatOutputFormat. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Created] (FLINK-2674) Rework windowing logic
Stephan Ewen created FLINK-2674: --- Summary: Rework windowing logic Key: FLINK-2674 URL: https://issues.apache.org/jira/browse/FLINK-2674 Project: Flink Issue Type: Bug Components: Streaming Affects Versions: 0.10 Reporter: Stephan Ewen Priority: Critical Fix For: 0.10 The windowing logic needs a major overhaul. This follows the design documents: - https://cwiki.apache.org/confluence/display/FLINK/Time+and+Order+in+Streams - https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=60624830 Specifically, the following shortcomings need to be addressed: - Global parallel windows should be dropped -> for time, local windows are aligned and serve the same purpose -> there is currently no known robust and efficient parallel implementation of custom strategies - Event time and out of order arrival needs to be supported - Eviction of not accessed keys does not work. Non-accessed keys linger infinitely - Performance is currently bad for time windows, due to a overly general implementation - Resources are leaking, threads are not shut down - Elements are stored multiple times (discretizers, window buffers) - Finally, many implementations are buggy, produce wrong results -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-2675) Add utilities for scheduled triggers
Stephan Ewen created FLINK-2675: --- Summary: Add utilities for scheduled triggers Key: FLINK-2675 URL: https://issues.apache.org/jira/browse/FLINK-2675 Project: Flink Issue Type: Sub-task Components: Streaming Affects Versions: 0.10 Reporter: Stephan Ewen Assignee: Stephan Ewen Fix For: 0.10 These utilities help schedule triggers for the future, ensure non-concurrent trigger execution, and proper trigger shutdown and release. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-2678) Java/Scala API does not support multi-dimensional arrays as keys
Till Rohrmann created FLINK-2678: Summary: Java/Scala API does not support multi-dimensional arrays as keys Key: FLINK-2678 URL: https://issues.apache.org/jira/browse/FLINK-2678 Project: Flink Issue Type: Wish Components: Java API, Scala API Reporter: Till Rohrmann Priority: Minor The Java/Scala API does not support grouping/sorting on field which are multi-dimensional arrays. It could be helpful to also support these types. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-2679) Scala API does not support Try type as key
[ https://issues.apache.org/jira/browse/FLINK-2679?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann updated FLINK-2679: - Summary: Scala API does not support Try type as key (was: Scala API does not support Try type as keys) > Scala API does not support Try type as key > -- > > Key: FLINK-2679 > URL: https://issues.apache.org/jira/browse/FLINK-2679 > Project: Flink > Issue Type: Wish > Components: Scala API >Reporter: Till Rohrmann >Priority: Minor > > The Scala API does not support to use the {{Try}} type as a key (used for > sorting, grouping e.g.). It could be helpful to add support for this type. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-2679) Scala API does not support Try type as keys
Till Rohrmann created FLINK-2679: Summary: Scala API does not support Try type as keys Key: FLINK-2679 URL: https://issues.apache.org/jira/browse/FLINK-2679 Project: Flink Issue Type: Wish Components: Scala API Reporter: Till Rohrmann Priority: Minor The Scala API does not support to use the {{Try}} type as a key (used for sorting, grouping e.g.). It could be helpful to add support for this type. -- This message was sent by Atlassian JIRA (v6.3.4#6332)