[GitHub] flink pull request #2149: [FLINK-4084] Add configDir parameter to CliFronten...
Github user chobeat commented on a diff in the pull request: https://github.com/apache/flink/pull/2149#discussion_r7389 --- Diff: docs/apis/cli.md --- @@ -187,6 +187,8 @@ Action "run" compiles and runs a program. java.net.URLClassLoader}. -d,--detachedIf present, runs the job in detached mode +--configDir The configuration directory with which --- End diff -- This is a space, I checked. Do you want it to be aligned to the beginning of the other lines or is it ok like it is? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3042) Define a way to let types create their own TypeInformation
[ https://issues.apache.org/jira/browse/FLINK-3042?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15469960#comment-15469960 ] ASF GitHub Bot commented on FLINK-3042: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/2337#discussion_r7227 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/typeinfo/TypeInformation.java --- @@ -122,14 +123,25 @@ public abstract Class getTypeClass(); /** -* Returns the generic parameters of this type. +* Optional method for giving Flink's type extraction system information about the mapping +* of a generic type parameter to the type information of a subtype. This information is necessary +* in cases where type information should be deduced from an input type. * -* @return The list of generic parameters. This list can be empty. +* For instance, a method for a {@link Tuple2} would look like this: +* +* Map m = new HashMap(); +* m.put("T0", this.getTypeAt(0)); +* m.put("T1", this.getTypeAt(1)); +* return m; +* +* +* @return map of inferred subtypes; it must not contain all generic parameters as key; --- End diff -- You are right, "it doesn't have to contain..." would be better. What I wanted to say is: It is also ok to just supply partial type information for generic parameters (so returning an empty map is always acceptable). > Define a way to let types create their own TypeInformation > -- > > Key: FLINK-3042 > URL: https://issues.apache.org/jira/browse/FLINK-3042 > Project: Flink > Issue Type: Improvement > Components: Core >Affects Versions: 0.10.0 >Reporter: Stephan Ewen >Assignee: Timo Walther > Fix For: 1.0.0 > > > Currently, introducing new Types that should have specific TypeInformation > requires > - Either integration with the TypeExtractor > - Or manually constructing the TypeInformation (potentially at every place) > and using type hints everywhere. > I propose to add a way to allow classes to create their own TypeInformation > (like a static method "createTypeInfo()"). > To support generic nested types (like Optional / Either), the type extractor > would provide a Map of what generic variables map to what types (deduced from > the input). The class can use that to create the correct nested > TypeInformation (possibly by calling the TypeExtractor again, passing the Map > of generic bindings). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4084) Add configDir parameter to CliFrontend and flink shell script
[ https://issues.apache.org/jira/browse/FLINK-4084?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15469963#comment-15469963 ] ASF GitHub Bot commented on FLINK-4084: --- Github user chobeat commented on a diff in the pull request: https://github.com/apache/flink/pull/2149#discussion_r7389 --- Diff: docs/apis/cli.md --- @@ -187,6 +187,8 @@ Action "run" compiles and runs a program. java.net.URLClassLoader}. -d,--detachedIf present, runs the job in detached mode +--configDir The configuration directory with which --- End diff -- This is a space, I checked. Do you want it to be aligned to the beginning of the other lines or is it ok like it is? > Add configDir parameter to CliFrontend and flink shell script > - > > Key: FLINK-4084 > URL: https://issues.apache.org/jira/browse/FLINK-4084 > Project: Flink > Issue Type: Improvement > Components: Client >Affects Versions: 1.1.0 >Reporter: Till Rohrmann >Assignee: Andrea Sella >Priority: Minor > > At the moment there is no other way than the environment variable > FLINK_CONF_DIR to specify the configuration directory for the CliFrontend if > it is started via the flink shell script. In order to improve the user > exprience, I would propose to introduce a {{--configDir}} parameter which the > user can use to specify a configuration directory more easily. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4547) when call connect method in AkkaRpcService using same address and same rpc gateway class, the returned gateways are equal with respect to equals and hashCode
[ https://issues.apache.org/jira/browse/FLINK-4547?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15470051#comment-15470051 ] ASF GitHub Bot commented on FLINK-4547: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2455#discussion_r77782622 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java --- @@ -189,7 +191,49 @@ public void stop() { rpcEndpoint.tell(Processing.STOP, ActorRef.noSender()); } - // + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + + if (o == null) { + return false; + } + + if(Proxy.isProxyClass(o.getClass())) { + return o.equals(this); + } --- End diff -- Alright, I understand that now :-). > when call connect method in AkkaRpcService using same address and same rpc > gateway class, the returned gateways are equal with respect to equals and > hashCode > - > > Key: FLINK-4547 > URL: https://issues.apache.org/jira/browse/FLINK-4547 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination >Reporter: zhangjing >Assignee: zhangjing > > Now every time call connect method in AkkaRpcService class using same address > and same rpc gateway class, the return gateway object is totally different > with each other which equals and hashcode are not same. > Maybe it’s reasonable to have the same result (equals return true, and > hashcode is same) when using the same address and same Gateway class. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2455: [FLINK-4547] [cluster management] when call connec...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2455#discussion_r77782622 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java --- @@ -189,7 +191,49 @@ public void stop() { rpcEndpoint.tell(Processing.STOP, ActorRef.noSender()); } - // + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + + if (o == null) { + return false; + } + + if(Proxy.isProxyClass(o.getClass())) { + return o.equals(this); + } --- End diff -- Alright, I understand that now :-). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2452: [Flink-4450] add a new module "flink-apache-storm" to sup...
Github user mxm commented on the issue: https://github.com/apache/flink/pull/2452 @liuyuzhong Let's wait a couple more days for the community to respond. You don't have to close the PR. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4565) Support for SQL IN operator
[ https://issues.apache.org/jira/browse/FLINK-4565?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15470412#comment-15470412 ] Simone Robutti commented on FLINK-4565: --- The resulting API should look like `ds1.join(ds2).where('a in 'b)` or `ds1.in(ds2).something('a,'b)`? > Support for SQL IN operator > --- > > Key: FLINK-4565 > URL: https://issues.apache.org/jira/browse/FLINK-4565 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Simone Robutti > > It seems that Flink SQL supports the uncorrelated sub-query IN operator. But > it should also be available in the Table API and tested. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Assigned] (FLINK-4506) CsvOutputFormat defaults allowNullValues to false, even though doc and declaration says true
[ https://issues.apache.org/jira/browse/FLINK-4506?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirill Morozov reassigned FLINK-4506: - Assignee: Kirill Morozov > CsvOutputFormat defaults allowNullValues to false, even though doc and > declaration says true > > > Key: FLINK-4506 > URL: https://issues.apache.org/jira/browse/FLINK-4506 > Project: Flink > Issue Type: Bug > Components: Batch Connectors and Input/Output Formats, Documentation >Reporter: Michael Wong >Assignee: Kirill Morozov >Priority: Minor > > In the constructor, it has this > {code} > this.allowNullValues = false; > {code} > But in the setAllowNullValues() method, the doc says the allowNullValues is > true by default. Also, in the declaration of allowNullValues, the value is > set to true. It probably makes the most sense to change the constructor. > {code} > /** >* Configures the format to either allow null values (writing an empty > field), >* or to throw an exception when encountering a null field. >* >* by default, null values are allowed. >* >* @param allowNulls Flag to indicate whether the output format should > accept null values. >*/ > public void setAllowNullValues(boolean allowNulls) { > this.allowNullValues = allowNulls; > } > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2477: [FLINK-4506] CsvOutputFormat defaults allowNullVal...
GitHub user kirill-morozov-epam opened a pull request: https://github.com/apache/flink/pull/2477 [FLINK-4506] CsvOutputFormat defaults allowNullValues to false, even ⦠You can merge this pull request into a Git repository by running: $ git pull https://github.com/kirill-morozov-epam/flink FLINK-4506 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2477.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2477 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4565) Support for SQL IN operator
[ https://issues.apache.org/jira/browse/FLINK-4565?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15470437#comment-15470437 ] Fabian Hueske commented on FLINK-4565: -- I thought rather of something like this: {{ds1.where('a in d2)}}. This would be the first time that a {{Table}} would be included in an expression. So not sure how easy it is to put it together and whether there are side effects to consider. > Support for SQL IN operator > --- > > Key: FLINK-4565 > URL: https://issues.apache.org/jira/browse/FLINK-4565 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Simone Robutti > > It seems that Flink SQL supports the uncorrelated sub-query IN operator. But > it should also be available in the Table API and tested. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4565) Support for SQL IN operator
[ https://issues.apache.org/jira/browse/FLINK-4565?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15470472#comment-15470472 ] Simone Robutti commented on FLINK-4565: --- That's exactly why I'm asking it: I had no idea what to use as a right operand. > Support for SQL IN operator > --- > > Key: FLINK-4565 > URL: https://issues.apache.org/jira/browse/FLINK-4565 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Simone Robutti > > It seems that Flink SQL supports the uncorrelated sub-query IN operator. But > it should also be available in the Table API and tested. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3950) Add Meter Metric Type
[ https://issues.apache.org/jira/browse/FLINK-3950?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15470536#comment-15470536 ] ASF GitHub Bot commented on FLINK-3950: --- Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/2443#discussion_r77814628 --- Diff: flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/MeterView.java --- @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.metrics; + +/** + * A MeterView provides a rate of events per second over a given time period. The events are counted by a {@link Counter}. + * A history of measurements is maintained from which the rate of events is calculated on demand. + */ +public class MeterView implements Meter, View { --- End diff -- I think it would be good to add some more javadocs to this class, at least for the `timeSpanInSeconds` argument of the ctor. > Add Meter Metric Type > - > > Key: FLINK-3950 > URL: https://issues.apache.org/jira/browse/FLINK-3950 > Project: Flink > Issue Type: Sub-task > Components: Core >Reporter: Stephan Ewen >Assignee: Ivan Mushketyk > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2443: [FLINK-3950] Implement MeterView
Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/2443#discussion_r77821528 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java --- @@ -105,7 +106,9 @@ public void setup(StreamTask containingTask, StreamConfig config, Output")[config.getChainIndex()].trim(); this.metrics = container.getEnvironment().getMetricGroup().addOperator(operatorName); - this.output = new CountingOutput(output, this.metrics.counter("numRecordsOut")); + Counter c = this.metrics.counter("numRecordsOut"); + this.output = new CountingOutput(output, c); + this.metrics.meter("numRecordsOutRate", new MeterView(c, 60)); --- End diff -- I think the metric needs to be documented in the list of "System metrics". --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3322) MemoryManager creates too much GC pressure with iterative jobs
[ https://issues.apache.org/jira/browse/FLINK-3322?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15470419#comment-15470419 ] ramkrishna.s.vasudevan commented on FLINK-3322: --- I am able to handle cases for the iterative jobs. But when we go to the LargeRecordHandlers then it is much more trickier. Checking that part Will get back on that. Currently the design is that the MemoryAllocator will be passed on to the Sorters and the memory allocator will have pre created memory segments. If the memory allocator is created by Iterative tasks then we ensure that such segments are not directly released to memory manager and retain them till the iterative tasks receive termination signal. In normal batch task cases - the memory allocators created are not to be kept for further iterations and hence we close them out. The sorters create read buffers, write buffers and large buffers. These are all static based. But inside large record handler we have some dynamic way to decide the number of records needed for keys and records. Will get back on this. Any suggestions/feedbacks here? > MemoryManager creates too much GC pressure with iterative jobs > -- > > Key: FLINK-3322 > URL: https://issues.apache.org/jira/browse/FLINK-3322 > Project: Flink > Issue Type: Bug > Components: Local Runtime >Affects Versions: 1.0.0 >Reporter: Gabor Gevay >Priority: Critical > Fix For: 1.0.0 > > Attachments: FLINK-3322.docx > > > When taskmanager.memory.preallocate is false (the default), released memory > segments are not added to a pool, but the GC is expected to take care of > them. This puts too much pressure on the GC with iterative jobs, where the > operators reallocate all memory at every superstep. > See the following discussion on the mailing list: > http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Memory-manager-behavior-in-iterative-jobs-tt10066.html > Reproducing the issue: > https://github.com/ggevay/flink/tree/MemoryManager-crazy-gc > The class to start is malom.Solver. If you increase the memory given to the > JVM from 1 to 50 GB, performance gradually degrades by more than 10 times. > (It will generate some lookuptables to /tmp on first run for a few minutes.) > (I think the slowdown might also depend somewhat on > taskmanager.memory.fraction, because more unused non-managed memory results > in rarer GCs.) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2443: [FLINK-3950] Implement MeterView
Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/2443#discussion_r77814628 --- Diff: flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/MeterView.java --- @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.metrics; + +/** + * A MeterView provides a rate of events per second over a given time period. The events are counted by a {@link Counter}. + * A history of measurements is maintained from which the rate of events is calculated on demand. + */ +public class MeterView implements Meter, View { --- End diff -- I think it would be good to add some more javadocs to this class, at least for the `timeSpanInSeconds` argument of the ctor. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2443: [FLINK-3950] Implement MeterView
Github user rmetzger commented on the issue: https://github.com/apache/flink/pull/2443 Overall a good change. Once my two comments are addressed, +1 to merge. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3950) Add Meter Metric Type
[ https://issues.apache.org/jira/browse/FLINK-3950?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15470566#comment-15470566 ] ASF GitHub Bot commented on FLINK-3950: --- Github user rmetzger commented on the issue: https://github.com/apache/flink/pull/2443 Overall a good change. Once my two comments are addressed, +1 to merge. > Add Meter Metric Type > - > > Key: FLINK-3950 > URL: https://issues.apache.org/jira/browse/FLINK-3950 > Project: Flink > Issue Type: Sub-task > Components: Core >Reporter: Stephan Ewen >Assignee: Ivan Mushketyk > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4506) CsvOutputFormat defaults allowNullValues to false, even though doc and declaration says true
[ https://issues.apache.org/jira/browse/FLINK-4506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15470488#comment-15470488 ] ASF GitHub Bot commented on FLINK-4506: --- GitHub user kirill-morozov-epam opened a pull request: https://github.com/apache/flink/pull/2477 [FLINK-4506] CsvOutputFormat defaults allowNullValues to false, even … You can merge this pull request into a Git repository by running: $ git pull https://github.com/kirill-morozov-epam/flink FLINK-4506 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2477.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2477 > CsvOutputFormat defaults allowNullValues to false, even though doc and > declaration says true > > > Key: FLINK-4506 > URL: https://issues.apache.org/jira/browse/FLINK-4506 > Project: Flink > Issue Type: Bug > Components: Batch Connectors and Input/Output Formats, Documentation >Reporter: Michael Wong >Assignee: Kirill Morozov >Priority: Minor > > In the constructor, it has this > {code} > this.allowNullValues = false; > {code} > But in the setAllowNullValues() method, the doc says the allowNullValues is > true by default. Also, in the declaration of allowNullValues, the value is > set to true. It probably makes the most sense to change the constructor. > {code} > /** >* Configures the format to either allow null values (writing an empty > field), >* or to throw an exception when encountering a null field. >* >* by default, null values are allowed. >* >* @param allowNulls Flag to indicate whether the output format should > accept null values. >*/ > public void setAllowNullValues(boolean allowNulls) { > this.allowNullValues = allowNulls; > } > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2443: [FLINK-3950] Implement MeterView
Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/2443#discussion_r77815865 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java --- @@ -105,7 +106,9 @@ public void setup(StreamTask containingTask, StreamConfig config, Output")[config.getChainIndex()].trim(); this.metrics = container.getEnvironment().getMetricGroup().addOperator(operatorName); - this.output = new CountingOutput(output, this.metrics.counter("numRecordsOut")); + Counter c = this.metrics.counter("numRecordsOut"); + this.output = new CountingOutput(output, c); + this.metrics.meter("numRecordsOutRate", new MeterView(c, 60)); --- End diff -- I think the metric name should include the unit, in this case perMinute. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3950) Add Meter Metric Type
[ https://issues.apache.org/jira/browse/FLINK-3950?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15470554#comment-15470554 ] ASF GitHub Bot commented on FLINK-3950: --- Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/2443#discussion_r77815865 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java --- @@ -105,7 +106,9 @@ public void setup(StreamTask containingTask, StreamConfig config, Output")[config.getChainIndex()].trim(); this.metrics = container.getEnvironment().getMetricGroup().addOperator(operatorName); - this.output = new CountingOutput(output, this.metrics.counter("numRecordsOut")); + Counter c = this.metrics.counter("numRecordsOut"); + this.output = new CountingOutput(output, c); + this.metrics.meter("numRecordsOutRate", new MeterView(c, 60)); --- End diff -- I think the metric name should include the unit, in this case perMinute. > Add Meter Metric Type > - > > Key: FLINK-3950 > URL: https://issues.apache.org/jira/browse/FLINK-3950 > Project: Flink > Issue Type: Sub-task > Components: Core >Reporter: Stephan Ewen >Assignee: Ivan Mushketyk > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4589) Fix Merging of Covering Window in MergingWindowSet
[ https://issues.apache.org/jira/browse/FLINK-4589?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15470397#comment-15470397 ] ASF GitHub Bot commented on FLINK-4589: --- GitHub user aljoscha opened a pull request: https://github.com/apache/flink/pull/2476 [FLINK-4589] Fix Merging of Covering Window in MergingWindowSet This also adds two new test cases for that problem. R: @StephanEwen You can merge this pull request into a Git repository by running: $ git pull https://github.com/aljoscha/flink fix-merging-set Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2476.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2476 commit 2eec6ba7ab6acf55ead2ea395a33043c87d1c911 Author: Aljoscha KrettekDate: 2016-09-07T11:51:53Z [FLINK-4589] Fix Merging of Covering Window in MergingWindowSet This also adds two new test cases for that problem. > Fix Merging of Covering Window in MergingWindowSet > -- > > Key: FLINK-4589 > URL: https://issues.apache.org/jira/browse/FLINK-4589 > Project: Flink > Issue Type: Bug > Components: Windowing Operators >Reporter: Aljoscha Krettek >Assignee: Aljoscha Krettek >Priority: Blocker > Fix For: 1.0.4, 1.2.0, 1.1.3 > > > Right now, when a new window gets merged that covers all of the existing > window {{MergingWindowSet}} does not correctly set the state window. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2476: [FLINK-4589] Fix Merging of Covering Window in Mer...
GitHub user aljoscha opened a pull request: https://github.com/apache/flink/pull/2476 [FLINK-4589] Fix Merging of Covering Window in MergingWindowSet This also adds two new test cases for that problem. R: @StephanEwen You can merge this pull request into a Git repository by running: $ git pull https://github.com/aljoscha/flink fix-merging-set Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2476.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2476 commit 2eec6ba7ab6acf55ead2ea395a33043c87d1c911 Author: Aljoscha KrettekDate: 2016-09-07T11:51:53Z [FLINK-4589] Fix Merging of Covering Window in MergingWindowSet This also adds two new test cases for that problem. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Created] (FLINK-4591) Select star does not work with grouping
Timo Walther created FLINK-4591: --- Summary: Select star does not work with grouping Key: FLINK-4591 URL: https://issues.apache.org/jira/browse/FLINK-4591 Project: Flink Issue Type: Improvement Components: Table API & SQL Reporter: Timo Walther It would be consistent if this would also work: {{table.groupBy('*).select("*)}} Currently, the star only works in a plain select without grouping. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3950) Add Meter Metric Type
[ https://issues.apache.org/jira/browse/FLINK-3950?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15470653#comment-15470653 ] ASF GitHub Bot commented on FLINK-3950: --- Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/2443#discussion_r77821528 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java --- @@ -105,7 +106,9 @@ public void setup(StreamTask containingTask, StreamConfig config, Output")[config.getChainIndex()].trim(); this.metrics = container.getEnvironment().getMetricGroup().addOperator(operatorName); - this.output = new CountingOutput(output, this.metrics.counter("numRecordsOut")); + Counter c = this.metrics.counter("numRecordsOut"); + this.output = new CountingOutput(output, c); + this.metrics.meter("numRecordsOutRate", new MeterView(c, 60)); --- End diff -- I think the metric needs to be documented in the list of "System metrics". > Add Meter Metric Type > - > > Key: FLINK-3950 > URL: https://issues.apache.org/jira/browse/FLINK-3950 > Project: Flink > Issue Type: Sub-task > Components: Core >Reporter: Stephan Ewen >Assignee: Ivan Mushketyk > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2463: [FLINK-4538][FLINK-4348] ResourceManager slot allo...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2463#discussion_r77787126 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/LeaderIdRegistry.java --- @@ -15,11 +15,27 @@ * See the License for the specific language governing permissions and * limitations under the License. */ +package org.apache.flink.runtime.highavailability; -package org.apache.flink.runtime.resourcemanager; +import java.util.UUID; -import java.io.Serializable; +/** + * Registry class to keep track of the current leader ID. + */ +public class LeaderIdRegistry { --- End diff -- The class has some docs but as you can see given my initial question, it's purpose was not clear to me. Yes, I actually thought about marking `leaderSessionID` `volatile`. Given the interface of this class every component which has a reference to this registry is allowed to change the leader session ID. This can be problematic because components other than the `ResourceManager` should only be allowed to retrieve the leader session ID. I'm actually wondering whether it is not necessary to notify the components about a new leader session ID. For example, the `SlotManager` should probably free its registered slots when it loses the leadership. Wouldn't these calls be suitable to transmit the current leader session ID? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4538) Implement slot allocation protocol with JobMaster
[ https://issues.apache.org/jira/browse/FLINK-4538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15470114#comment-15470114 ] ASF GitHub Bot commented on FLINK-4538: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2463#discussion_r77787126 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/LeaderIdRegistry.java --- @@ -15,11 +15,27 @@ * See the License for the specific language governing permissions and * limitations under the License. */ +package org.apache.flink.runtime.highavailability; -package org.apache.flink.runtime.resourcemanager; +import java.util.UUID; -import java.io.Serializable; +/** + * Registry class to keep track of the current leader ID. + */ +public class LeaderIdRegistry { --- End diff -- The class has some docs but as you can see given my initial question, it's purpose was not clear to me. Yes, I actually thought about marking `leaderSessionID` `volatile`. Given the interface of this class every component which has a reference to this registry is allowed to change the leader session ID. This can be problematic because components other than the `ResourceManager` should only be allowed to retrieve the leader session ID. I'm actually wondering whether it is not necessary to notify the components about a new leader session ID. For example, the `SlotManager` should probably free its registered slots when it loses the leadership. Wouldn't these calls be suitable to transmit the current leader session ID? > Implement slot allocation protocol with JobMaster > - > > Key: FLINK-4538 > URL: https://issues.apache.org/jira/browse/FLINK-4538 > Project: Flink > Issue Type: Sub-task > Components: Cluster Management >Reporter: Maximilian Michels >Assignee: Maximilian Michels > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2452: [Flink-4450] add a new module "flink-apache-storm" to sup...
Github user liuyuzhong commented on the issue: https://github.com/apache/flink/pull/2452 @mxm OK. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Created] (FLINK-4588) Fix Merging of Covering Window in MergingWindowSet
Aljoscha Krettek created FLINK-4588: --- Summary: Fix Merging of Covering Window in MergingWindowSet Key: FLINK-4588 URL: https://issues.apache.org/jira/browse/FLINK-4588 Project: Flink Issue Type: Bug Components: Windowing Operators Reporter: Aljoscha Krettek Assignee: Aljoscha Krettek Priority: Blocker Fix For: 1.0.4, 1.2.0, 1.1.3 Right now, when a new window gets merged that covers all of the existing window {{MergingWindowSet}} does not correctly set the state window. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-4579) Add StateBackendFactory for RocksDB Backend
[ https://issues.apache.org/jira/browse/FLINK-4579?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann updated FLINK-4579: - Issue Type: Improvement (was: Bug) > Add StateBackendFactory for RocksDB Backend > --- > > Key: FLINK-4579 > URL: https://issues.apache.org/jira/browse/FLINK-4579 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Reporter: Aljoscha Krettek > > Right now, we only have a {{StateBackendFactory}} for the {{FsStateBackend}} > which means that users cannot specify to use the RocksDB backend in the flink > configuration. > If we add a factory for rocksdb we should also think about adding the rocksdb > backend to the standard distribution lib, otherwise it is only usable if > users manually place the rocks jars in the Flink lib folder. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-4589) Fix Merging of Covering Window in MergingWindowSet
Aljoscha Krettek created FLINK-4589: --- Summary: Fix Merging of Covering Window in MergingWindowSet Key: FLINK-4589 URL: https://issues.apache.org/jira/browse/FLINK-4589 Project: Flink Issue Type: Bug Components: Windowing Operators Reporter: Aljoscha Krettek Assignee: Aljoscha Krettek Priority: Blocker Fix For: 1.0.4, 1.2.0, 1.1.3 Right now, when a new window gets merged that covers all of the existing window {{MergingWindowSet}} does not correctly set the state window. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2337: [FLINK-3042] [FLINK-3060] [types] Define a way to ...
Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/2337#discussion_r7227 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/typeinfo/TypeInformation.java --- @@ -122,14 +123,25 @@ public abstract Class getTypeClass(); /** -* Returns the generic parameters of this type. +* Optional method for giving Flink's type extraction system information about the mapping +* of a generic type parameter to the type information of a subtype. This information is necessary +* in cases where type information should be deduced from an input type. * -* @return The list of generic parameters. This list can be empty. +* For instance, a method for a {@link Tuple2} would look like this: +* +* Map m = new HashMap(); +* m.put("T0", this.getTypeAt(0)); +* m.put("T1", this.getTypeAt(1)); +* return m; +* +* +* @return map of inferred subtypes; it must not contain all generic parameters as key; --- End diff -- You are right, "it doesn't have to contain..." would be better. What I wanted to say is: It is also ok to just supply partial type information for generic parameters (so returning an empty map is always acceptable). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2455: [FLINK-4547] [cluster management] when call connect metho...
Github user beyond1920 commented on the issue: https://github.com/apache/flink/pull/2455 @tillrohrmann @StephanEwen , thanks for your review. I changed the code based on your comment, including two points: 1. Change the JIRA and the PR subject line to better reflect the actual changes. 2. Modify the testcase which connect to invalid address in AkkaRpcServiceTest. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4547) when call connect method in AkkaRpcService using same address and same rpc gateway class, the returned gateways are equal with respect to equals and hashCode
[ https://issues.apache.org/jira/browse/FLINK-4547?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15469997#comment-15469997 ] ASF GitHub Bot commented on FLINK-4547: --- Github user beyond1920 commented on the issue: https://github.com/apache/flink/pull/2455 @tillrohrmann @StephanEwen , thanks for your review. I changed the code based on your comment, including two points: 1. Change the JIRA and the PR subject line to better reflect the actual changes. 2. Modify the testcase which connect to invalid address in AkkaRpcServiceTest. > when call connect method in AkkaRpcService using same address and same rpc > gateway class, the returned gateways are equal with respect to equals and > hashCode > - > > Key: FLINK-4547 > URL: https://issues.apache.org/jira/browse/FLINK-4547 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination >Reporter: zhangjing >Assignee: zhangjing > > Now every time call connect method in AkkaRpcService class using same address > and same rpc gateway class, the return gateway object is totally different > with each other which equals and hashcode are not same. > Maybe it’s reasonable to have the same result (equals return true, and > hashcode is same) when using the same address and same Gateway class. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4538) Implement slot allocation protocol with JobMaster
[ https://issues.apache.org/jira/browse/FLINK-4538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15470065#comment-15470065 ] ASF GitHub Bot commented on FLINK-4538: --- Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2463#discussion_r77783497 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/LeaderIdRegistry.java --- @@ -15,11 +15,27 @@ * See the License for the specific language governing permissions and * limitations under the License. */ +package org.apache.flink.runtime.highavailability; -package org.apache.flink.runtime.resourcemanager; +import java.util.UUID; -import java.io.Serializable; +/** + * Registry class to keep track of the current leader ID. + */ +public class LeaderIdRegistry { --- End diff -- What exactly do you mean? The class is thread-safe and documented (though documentation can be improved). There is no need for locking. Do you mean marking the leaderSessionID `volatile`? It should be fine if leader changes propagate lazily. > Implement slot allocation protocol with JobMaster > - > > Key: FLINK-4538 > URL: https://issues.apache.org/jira/browse/FLINK-4538 > Project: Flink > Issue Type: Sub-task > Components: Cluster Management >Reporter: Maximilian Michels >Assignee: Maximilian Michels > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2463: [FLINK-4538][FLINK-4348] ResourceManager slot allo...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2463#discussion_r77783497 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/LeaderIdRegistry.java --- @@ -15,11 +15,27 @@ * See the License for the specific language governing permissions and * limitations under the License. */ +package org.apache.flink.runtime.highavailability; -package org.apache.flink.runtime.resourcemanager; +import java.util.UUID; -import java.io.Serializable; +/** + * Registry class to keep track of the current leader ID. + */ +public class LeaderIdRegistry { --- End diff -- What exactly do you mean? The class is thread-safe and documented (though documentation can be improved). There is no need for locking. Do you mean marking the leaderSessionID `volatile`? It should be fine if leader changes propagate lazily. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Closed] (FLINK-4376) Implement job master skeleton for managing single job
[ https://issues.apache.org/jira/browse/FLINK-4376?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kurt Young closed FLINK-4376. - Resolution: Duplicate I think this is duplicated with https://issues.apache.org/jira/browse/FLINK-4408 > Implement job master skeleton for managing single job > -- > > Key: FLINK-4376 > URL: https://issues.apache.org/jira/browse/FLINK-4376 > Project: Flink > Issue Type: Sub-task > Components: Cluster Management >Reporter: Wenlong Lyu >Assignee: Kurt Young > > This jira targets to set up a skeleton of the new JobMaster, translate scala > codes to java. Make necessary changes for JobMaster only manages one job. > This will include : > 1. Blob related logics > 2. SubmittedJobGraphStore > 3. checkpoint & savepoint > 4. metrics registry -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-4406) Implement job master registration at resource manager
[ https://issues.apache.org/jira/browse/FLINK-4406?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kurt Young updated FLINK-4406: -- Assignee: (was: Kurt Young) > Implement job master registration at resource manager > - > > Key: FLINK-4406 > URL: https://issues.apache.org/jira/browse/FLINK-4406 > Project: Flink > Issue Type: Sub-task > Components: Cluster Management >Reporter: Wenlong Lyu > > Job Master needs to register to Resource Manager when starting and then > watches leadership changes of RM, and trigger re-registration. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2337: [FLINK-3042] [FLINK-3060] [types] Define a way to ...
Github user greghogan commented on a diff in the pull request: https://github.com/apache/flink/pull/2337#discussion_r77827675 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/typeinfo/TypeInfoFactory.java --- @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.typeinfo; + +import java.lang.reflect.Type; +import java.util.Map; +import org.apache.flink.annotation.Public; +import org.apache.flink.api.java.typeutils.TypeExtractor; + +/** + * Base class for implementing a type information factory. A type information factory allows for + * plugging-in user-defined {@link TypeInformation} into the Flink type system. The factory is + * called during the type extraction phase if the corresponding type has been annotated with + * {@link TypeInfo}. In a hierarchy of types the closest factory will be chosen while traversing + * upwards, however, a globally registered factory has highest precedence + * (see {@link TypeExtractor#registerFactory(Type, Class)}). + * + * @param type for which {@link TypeInformation} is created + */ +@Public +public abstract class TypeInfoFactory { + + public TypeInfoFactory() { + // default constructor --- End diff -- What is the reason for the empty no-arg constructor? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2443: [FLINK-3950] Implement MeterView
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/2443#discussion_r77825932 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java --- @@ -105,7 +106,9 @@ public void setup(StreamTask containingTask, StreamConfig config, Output")[config.getChainIndex()].trim(); this.metrics = container.getEnvironment().getMetricGroup().addOperator(operatorName); - this.output = new CountingOutput(output, this.metrics.counter("numRecordsOut")); + Counter c = this.metrics.counter("numRecordsOut"); + this.output = new CountingOutput(output, c); + this.metrics.meter("numRecordsOutRate", new MeterView(c, 60)); --- End diff -- this just served as an example... --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3042) Define a way to let types create their own TypeInformation
[ https://issues.apache.org/jira/browse/FLINK-3042?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15470728#comment-15470728 ] ASF GitHub Bot commented on FLINK-3042: --- Github user greghogan commented on a diff in the pull request: https://github.com/apache/flink/pull/2337#discussion_r77827675 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/typeinfo/TypeInfoFactory.java --- @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.typeinfo; + +import java.lang.reflect.Type; +import java.util.Map; +import org.apache.flink.annotation.Public; +import org.apache.flink.api.java.typeutils.TypeExtractor; + +/** + * Base class for implementing a type information factory. A type information factory allows for + * plugging-in user-defined {@link TypeInformation} into the Flink type system. The factory is + * called during the type extraction phase if the corresponding type has been annotated with + * {@link TypeInfo}. In a hierarchy of types the closest factory will be chosen while traversing + * upwards, however, a globally registered factory has highest precedence + * (see {@link TypeExtractor#registerFactory(Type, Class)}). + * + * @param type for which {@link TypeInformation} is created + */ +@Public +public abstract class TypeInfoFactory { + + public TypeInfoFactory() { + // default constructor --- End diff -- What is the reason for the empty no-arg constructor? > Define a way to let types create their own TypeInformation > -- > > Key: FLINK-3042 > URL: https://issues.apache.org/jira/browse/FLINK-3042 > Project: Flink > Issue Type: Improvement > Components: Core >Affects Versions: 0.10.0 >Reporter: Stephan Ewen >Assignee: Timo Walther > Fix For: 1.0.0 > > > Currently, introducing new Types that should have specific TypeInformation > requires > - Either integration with the TypeExtractor > - Or manually constructing the TypeInformation (potentially at every place) > and using type hints everywhere. > I propose to add a way to allow classes to create their own TypeInformation > (like a static method "createTypeInfo()"). > To support generic nested types (like Optional / Either), the type extractor > would provide a Map of what generic variables map to what types (deduced from > the input). The class can use that to create the correct nested > TypeInformation (possibly by calling the TypeExtractor again, passing the Map > of generic bindings). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3042) Define a way to let types create their own TypeInformation
[ https://issues.apache.org/jira/browse/FLINK-3042?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15470881#comment-15470881 ] ASF GitHub Bot commented on FLINK-3042: --- Github user greghogan commented on a diff in the pull request: https://github.com/apache/flink/pull/2337#discussion_r77842627 --- Diff: flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java --- @@ -792,12 +832,40 @@ else if (t instanceof Class) { return null; } - + + @SuppressWarnings({"unchecked", "rawtypes"}) private TypeInformation createTypeInfoFromInput(TypeVariable returnTypeVar, ArrayList inputTypeHierarchy, Type inType, TypeInformation inTypeInfo) { TypeInformation info = null; - + + // use a factory to find corresponding type information to type variable + final ArrayList factoryHierarchy = new ArrayList<>(inputTypeHierarchy); + final TypeInfoFactory factory = getClosestFactory(factoryHierarchy, inType); + if (factory != null) { + // the type that defines the factory is last in factory hierarchy + final Type factoryDefiningType = factoryHierarchy.get(factoryHierarchy.size() - 1); + // defining type has generics, the factory need to be asked for a mapping of subtypes to type information + if (factoryDefiningType instanceof ParameterizedType) { --- End diff -- A TypeInformation is created here only with factories of parameterized types? > Define a way to let types create their own TypeInformation > -- > > Key: FLINK-3042 > URL: https://issues.apache.org/jira/browse/FLINK-3042 > Project: Flink > Issue Type: Improvement > Components: Core >Affects Versions: 0.10.0 >Reporter: Stephan Ewen >Assignee: Timo Walther > Fix For: 1.0.0 > > > Currently, introducing new Types that should have specific TypeInformation > requires > - Either integration with the TypeExtractor > - Or manually constructing the TypeInformation (potentially at every place) > and using type hints everywhere. > I propose to add a way to allow classes to create their own TypeInformation > (like a static method "createTypeInfo()"). > To support generic nested types (like Optional / Either), the type extractor > would provide a Map of what generic variables map to what types (deduced from > the input). The class can use that to create the correct nested > TypeInformation (possibly by calling the TypeExtractor again, passing the Map > of generic bindings). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2337: [FLINK-3042] [FLINK-3060] [types] Define a way to ...
Github user greghogan commented on a diff in the pull request: https://github.com/apache/flink/pull/2337#discussion_r77842627 --- Diff: flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java --- @@ -792,12 +832,40 @@ else if (t instanceof Class) { return null; } - + + @SuppressWarnings({"unchecked", "rawtypes"}) private TypeInformation createTypeInfoFromInput(TypeVariable returnTypeVar, ArrayList inputTypeHierarchy, Type inType, TypeInformation inTypeInfo) { TypeInformation info = null; - + + // use a factory to find corresponding type information to type variable + final ArrayList factoryHierarchy = new ArrayList<>(inputTypeHierarchy); + final TypeInfoFactory factory = getClosestFactory(factoryHierarchy, inType); + if (factory != null) { + // the type that defines the factory is last in factory hierarchy + final Type factoryDefiningType = factoryHierarchy.get(factoryHierarchy.size() - 1); + // defining type has generics, the factory need to be asked for a mapping of subtypes to type information + if (factoryDefiningType instanceof ParameterizedType) { --- End diff -- A TypeInformation is created here only with factories of parameterized types? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Created] (FLINK-4592) Fix flaky test ScalarFunctionsTest.testCurrentTimePoint
Timo Walther created FLINK-4592: --- Summary: Fix flaky test ScalarFunctionsTest.testCurrentTimePoint Key: FLINK-4592 URL: https://issues.apache.org/jira/browse/FLINK-4592 Project: Flink Issue Type: Bug Components: Table API & SQL Reporter: Timo Walther It seems that the test is still non deterministic. {code} org.apache.flink.api.table.expressions.ScalarFunctionsTest testCurrentTimePoint(org.apache.flink.api.table.expressions.ScalarFunctionsTest) Time elapsed: 0.083 sec <<< FAILURE! org.junit.ComparisonFailure: Wrong result for: AS(>=(CHAR_LENGTH(CAST(CURRENT_TIMESTAMP):VARCHAR(1) CHARACTER SET "ISO-8859-1" COLLATE "ISO-8859-1$en_US$primary" NOT NULL), 22), '_c0') expected:<[tru]e> but was:<[fals]e> at org.junit.Assert.assertEquals(Assert.java:115) at org.apache.flink.api.table.expressions.utils.ExpressionTestBase$$anonfun$evaluateExprs$1.apply(ExpressionTestBase.scala:126) at org.apache.flink.api.table.expressions.utils.ExpressionTestBase$$anonfun$evaluateExprs$1.apply(ExpressionTestBase.scala:123) at scala.collection.mutable.LinkedHashSet.foreach(LinkedHashSet.scala:87) at org.apache.flink.api.table.expressions.utils.ExpressionTestBase.evaluateExprs(ExpressionTestBase.scala:123) at sun.reflect.GeneratedMethodAccessor6.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:47) at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:44) at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:33) at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:271) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:70) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:50) at org.junit.runners.ParentRunner$3.run(ParentRunner.java:238) at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:63) at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:236) at org.junit.runners.ParentRunner.access$000(ParentRunner.java:53) at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:229) at org.junit.runners.ParentRunner.run(ParentRunner.java:309) at org.apache.maven.surefire.junit4.JUnit4Provider.execute(JUnit4Provider.java:283) at org.apache.maven.surefire.junit4.JUnit4Provider.executeWithRerun(JUnit4Provider.java:173) at org.apache.maven.surefire.junit4.JUnit4Provider.executeTestSet(JUnit4Provider.java:153) at org.apache.maven.surefire.junit4.JUnit4Provider.invoke(JUnit4Provider.java:128) at org.apache.maven.surefire.booter.ForkedBooter.invokeProviderInSameClassLoader(ForkedBooter.java:203) at org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:155) at org.apache.maven.surefire.booter.ForkedBooter.main(ForkedBooter.java:103) {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3950) Add Meter Metric Type
[ https://issues.apache.org/jira/browse/FLINK-3950?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15470705#comment-15470705 ] ASF GitHub Bot commented on FLINK-3950: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/2443#discussion_r77825932 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java --- @@ -105,7 +106,9 @@ public void setup(StreamTask containingTask, StreamConfig config, Output")[config.getChainIndex()].trim(); this.metrics = container.getEnvironment().getMetricGroup().addOperator(operatorName); - this.output = new CountingOutput(output, this.metrics.counter("numRecordsOut")); + Counter c = this.metrics.counter("numRecordsOut"); + this.output = new CountingOutput(output, c); + this.metrics.meter("numRecordsOutRate", new MeterView(c, 60)); --- End diff -- this just served as an example... > Add Meter Metric Type > - > > Key: FLINK-3950 > URL: https://issues.apache.org/jira/browse/FLINK-3950 > Project: Flink > Issue Type: Sub-task > Components: Core >Reporter: Stephan Ewen >Assignee: Ivan Mushketyk > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2443: [FLINK-3950] Implement MeterView
Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/2443#discussion_r77829590 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java --- @@ -105,7 +106,9 @@ public void setup(StreamTask containingTask, StreamConfig config, Output")[config.getChainIndex()].trim(); this.metrics = container.getEnvironment().getMetricGroup().addOperator(operatorName); - this.output = new CountingOutput(output, this.metrics.counter("numRecordsOut")); + Counter c = this.metrics.counter("numRecordsOut"); + this.output = new CountingOutput(output, c); + this.metrics.meter("numRecordsOutRate", new MeterView(c, 60)); --- End diff -- With the two suggested changes, we can make the example something useful ;) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3950) Add Meter Metric Type
[ https://issues.apache.org/jira/browse/FLINK-3950?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15470749#comment-15470749 ] ASF GitHub Bot commented on FLINK-3950: --- Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/2443#discussion_r77829590 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java --- @@ -105,7 +106,9 @@ public void setup(StreamTask containingTask, StreamConfig config, Output")[config.getChainIndex()].trim(); this.metrics = container.getEnvironment().getMetricGroup().addOperator(operatorName); - this.output = new CountingOutput(output, this.metrics.counter("numRecordsOut")); + Counter c = this.metrics.counter("numRecordsOut"); + this.output = new CountingOutput(output, c); + this.metrics.meter("numRecordsOutRate", new MeterView(c, 60)); --- End diff -- With the two suggested changes, we can make the example something useful ;) > Add Meter Metric Type > - > > Key: FLINK-3950 > URL: https://issues.apache.org/jira/browse/FLINK-3950 > Project: Flink > Issue Type: Sub-task > Components: Core >Reporter: Stephan Ewen >Assignee: Ivan Mushketyk > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2440: [FLINK-3755] Introduce key groups for key-value st...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2440#discussion_r77856665 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupRangeAssignment.java --- @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state; + +import org.apache.flink.util.MathUtils; +import org.apache.flink.util.Preconditions; + +public final class KeyGroupRangeAssignment { + + public static final int DEFAULT_MAX_PARALLELISM = 128; + + private KeyGroupRangeAssignment() { + throw new AssertionError(); + } + + /** +* Assigns the given key to a parallel operator index. +* +* @param key the key to assign +* @param maxParallelism the maximum supported parallelism, aka the number of key-groups. +* @param parallelism the current parallelism of the operator +* @return the index of the parallel operator to which the given key should be routed. +*/ + public static int assignKeyToParallelOperator(Object key, int maxParallelism, int parallelism) { + return computeOperatorIndexForKeyGroup(maxParallelism, parallelism, assignToKeyGroup(key, maxParallelism)); + } + + /** +* Assigns the given key to a key-group index. +* +* @param key the key to assign +* @param maxParallelism the maximum supported parallelism, aka the number of key-groups. +* @return the key-group to which the given key is assigned +*/ + public static final int assignToKeyGroup(Object key, int maxParallelism) { + return MathUtils.murmurHash(key.hashCode()) % maxParallelism; + } + + /** +* Computes the range of key-groups that are assigned to a given operator under the given parallelism and maximum +* parallelism. +* +* IMPORTANT: maxParallelism must be <= Short.MAX_VALUE to avoid rounding problems in this method. If we ever want +* to go beyond this boundary, this method must perform arithmetic on long values. +* +* @param maxParallelism Maximal parallelism that the job was initially created with. +* @param parallelismThe current parallelism under which the job runs. Must be <= maxParallelism. +* @param operatorIndex Id of a key-group. 0 <= keyGroupID < maxParallelism. +* @return +*/ + public static KeyGroupRange computeKeyGroupRangeForOperatorIndex( + int maxParallelism, + int parallelism, + int operatorIndex) { + Preconditions.checkArgument(parallelism > 0, "Parallelism must not be smaller than zero."); + Preconditions.checkArgument(maxParallelism >= parallelism, "Maximum parallelism must not be smaller than parallelism."); + Preconditions.checkArgument(maxParallelism <= (1 << 15), "Maximum parallelism must be smaller than 2^15."); + + int start = operatorIndex == 0 ? 0 : ((operatorIndex * maxParallelism - 1) / parallelism) + 1; + int end = ((operatorIndex + 1) * maxParallelism - 1) / parallelism; --- End diff -- Can't we simplify this expression to ``` int start = (operatorIndex * maxParallelism) / parallelism; int end = ((operatorIndex + 1) * maxParallelism) / parallelism - 1; ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Resolved] (FLINK-4585) Fix broken links in index.md
[ https://issues.apache.org/jira/browse/FLINK-4585?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Robert Metzger resolved FLINK-4585. --- Resolution: Fixed Resolved in http://git-wip-us.apache.org/repos/asf/flink-web/commit/bb6d820f > Fix broken links in index.md > > > Key: FLINK-4585 > URL: https://issues.apache.org/jira/browse/FLINK-4585 > Project: Flink > Issue Type: Bug > Components: Project Website >Reporter: Alexander Pivovarov >Priority: Minor > > The following links are broken > DataSet API > link: > http://ci.apache.org/projects/flink/flink-docs-master/apis/batch/index.html > correct link: > https://ci.apache.org/projects/flink/flink-docs-master/dev/batch/index.html > Table API > link: > http://ci.apache.org/projects/flink/flink-docs-master/apis/batch/libs/table.html > correct link: > https://ci.apache.org/projects/flink/flink-docs-master/dev/table_api.html > Gelly > link: > http://ci.apache.org/projects/flink/flink-docs-master/apis/batch/libs/gelly.html > correct link: > https://ci.apache.org/projects/flink/flink-docs-master/dev/libs/gelly/index.html > The following links show "Page 'X' Has Moved to" for 1-2 sec and then > redirect to another page > DataStream API > link: > http://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/index.html > redirects-to: > https://ci.apache.org/projects/flink/flink-docs-master/dev/datastream_api.html > programming guide > link: > http://ci.apache.org/projects/flink/flink-docs-master/apis/programming_guide.html > redirects-to DataSet API: > https://ci.apache.org/projects/flink/flink-docs-master/dev/batch/index.html > probably it should be "Basic API Concepts" > https://ci.apache.org/projects/flink/flink-docs-master/dev/api_concepts.html > or Quick Start - > https://ci.apache.org/projects/flink/flink-docs-master/quickstart/setup_quickstart.html > CEP > link: > http://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/libs/cep.html > redirects-to: > https://ci.apache.org/projects/flink/flink-docs-master/dev/libs/cep.html > ML > link: > http://ci.apache.org/projects/flink/flink-docs-master/apis/batch/libs/ml/index.html > redirects-to: > https://ci.apache.org/projects/flink/flink-docs-master/dev/libs/ml/index.html -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2478: [FLINK-4595] Close FileOutputStream in ParameterTo...
GitHub user apivovarov opened a pull request: https://github.com/apache/flink/pull/2478 [FLINK-4595] Close FileOutputStream in ParameterTool https://issues.apache.org/jira/browse/FLINK-4595 You can merge this pull request into a Git repository by running: $ git pull https://github.com/apivovarov/flink FLINK-4595 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2478.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2478 commit 46f68d5fc621324368ad31fbba52bdf13abfae48 Author: Alexander PivovarovDate: 2016-09-07T21:11:06Z [FLINK-4595] Close FileOutputStream in ParameterTool --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4595) Close FileOutputStream in ParameterTool
[ https://issues.apache.org/jira/browse/FLINK-4595?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15471823#comment-15471823 ] ASF GitHub Bot commented on FLINK-4595: --- GitHub user apivovarov opened a pull request: https://github.com/apache/flink/pull/2478 [FLINK-4595] Close FileOutputStream in ParameterTool https://issues.apache.org/jira/browse/FLINK-4595 You can merge this pull request into a Git repository by running: $ git pull https://github.com/apivovarov/flink FLINK-4595 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2478.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2478 commit 46f68d5fc621324368ad31fbba52bdf13abfae48 Author: Alexander PivovarovDate: 2016-09-07T21:11:06Z [FLINK-4595] Close FileOutputStream in ParameterTool > Close FileOutputStream in ParameterTool > --- > > Key: FLINK-4595 > URL: https://issues.apache.org/jira/browse/FLINK-4595 > Project: Flink > Issue Type: Bug > Components: Java API >Affects Versions: 1.1.2 >Reporter: Alexander Pivovarov >Priority: Trivial > > ParameterTool and ParameterToolTest do not close FileOutputStream > {code} > defaultProps.store(new FileOutputStream(file), "Default file created by > Flink's ParameterUtil.createPropertiesFile()"); > {code} > {code} > props.store(new FileOutputStream(propertiesFile), "Test properties"); > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-4595) Close FileOutputStream in ParameterTool
Alexander Pivovarov created FLINK-4595: -- Summary: Close FileOutputStream in ParameterTool Key: FLINK-4595 URL: https://issues.apache.org/jira/browse/FLINK-4595 Project: Flink Issue Type: Bug Components: Java API Affects Versions: 1.1.2 Reporter: Alexander Pivovarov Priority: Trivial ParameterTool and ParameterToolTest do not close FileOutputStream {code} defaultProps.store(new FileOutputStream(file), "Default file created by Flink's ParameterUtil.createPropertiesFile()"); {code} {code} props.store(new FileOutputStream(propertiesFile), "Test properties"); {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-4586) NumberSequenceIterator and Accumulator threading issue
[ https://issues.apache.org/jira/browse/FLINK-4586?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Johannes updated FLINK-4586: Attachment: FLINK4586Test.scala Scala unit test > NumberSequenceIterator and Accumulator threading issue > -- > > Key: FLINK-4586 > URL: https://issues.apache.org/jira/browse/FLINK-4586 > Project: Flink > Issue Type: Bug > Components: DataSet API >Affects Versions: 1.1.2 >Reporter: Johannes >Priority: Minor > Attachments: FLINK4586Test.scala > > > There is a strange problem when using the NumberSequenceIterator in > combination with an AverageAccumulator. > It seems like the individual accumulators are reinitialized and overwrite > parts of intermediate solutions. > The following scala snippit exemplifies the problem. > Instead of printing the correct average, the result should be {{50.5}} but is > something completely different, like {{8.08}}, dependent on the number of > cores used. > If the parallelism is set to {{1}} the result is correct, which indicates a > likely threading problem. > The problem occurs using the java and scala API. > {code} > env > .fromParallelCollection(new NumberSequenceIterator(1, 100)) > .map(new RichMapFunction[Long, Long] { > var a : AverageAccumulator = _ > override def map(value: Long): Long = { > a.add(value) > value > } > override def open(parameters: Configuration): Unit = { > a = new AverageAccumulator > getRuntimeContext.addAccumulator("test", a) > } > }) > .reduce((a, b) => a + b) > .print() > val lastJobExecutionResult: JobExecutionResult = env.getLastJobExecutionResult > println(lastJobExecutionResult.getAccumulatorResult("test")) > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-4586) NumberSequenceIterator and Accumulator threading issue
[ https://issues.apache.org/jira/browse/FLINK-4586?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Johannes updated FLINK-4586: Attachment: (was: FLINK4586Test.scala) > NumberSequenceIterator and Accumulator threading issue > -- > > Key: FLINK-4586 > URL: https://issues.apache.org/jira/browse/FLINK-4586 > Project: Flink > Issue Type: Bug > Components: DataSet API >Affects Versions: 1.1.2 >Reporter: Johannes >Priority: Minor > > There is a strange problem when using the NumberSequenceIterator in > combination with an AverageAccumulator. > It seems like the individual accumulators are reinitialized and overwrite > parts of intermediate solutions. > The following scala snippit exemplifies the problem. > Instead of printing the correct average, the result should be {{50.5}} but is > something completely different, like {{8.08}}, dependent on the number of > cores used. > If the parallelism is set to {{1}} the result is correct, which indicates a > likely threading problem. > The problem occurs using the java and scala API. > {code} > env > .fromParallelCollection(new NumberSequenceIterator(1, 100)) > .map(new RichMapFunction[Long, Long] { > var a : AverageAccumulator = _ > override def map(value: Long): Long = { > a.add(value) > value > } > override def open(parameters: Configuration): Unit = { > a = new AverageAccumulator > getRuntimeContext.addAccumulator("test", a) > } > }) > .reduce((a, b) => a + b) > .print() > val lastJobExecutionResult: JobExecutionResult = env.getLastJobExecutionResult > println(lastJobExecutionResult.getAccumulatorResult("test")) > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3755) Introduce key groups for key-value state to support dynamic scaling
[ https://issues.apache.org/jira/browse/FLINK-3755?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15471287#comment-15471287 ] ASF GitHub Bot commented on FLINK-3755: --- Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/2440#discussion_r77870173 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupRangeAssignment.java --- @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state; + +import org.apache.flink.util.MathUtils; +import org.apache.flink.util.Preconditions; + +public final class KeyGroupRangeAssignment { + + public static final int DEFAULT_MAX_PARALLELISM = 128; + + private KeyGroupRangeAssignment() { + throw new AssertionError(); + } + + /** +* Assigns the given key to a parallel operator index. +* +* @param key the key to assign +* @param maxParallelism the maximum supported parallelism, aka the number of key-groups. +* @param parallelism the current parallelism of the operator +* @return the index of the parallel operator to which the given key should be routed. +*/ + public static int assignKeyToParallelOperator(Object key, int maxParallelism, int parallelism) { + return computeOperatorIndexForKeyGroup(maxParallelism, parallelism, assignToKeyGroup(key, maxParallelism)); + } + + /** +* Assigns the given key to a key-group index. +* +* @param key the key to assign +* @param maxParallelism the maximum supported parallelism, aka the number of key-groups. +* @return the key-group to which the given key is assigned +*/ + public static final int assignToKeyGroup(Object key, int maxParallelism) { + return MathUtils.murmurHash(key.hashCode()) % maxParallelism; + } + + /** +* Computes the range of key-groups that are assigned to a given operator under the given parallelism and maximum +* parallelism. +* +* IMPORTANT: maxParallelism must be <= Short.MAX_VALUE to avoid rounding problems in this method. If we ever want +* to go beyond this boundary, this method must perform arithmetic on long values. +* +* @param maxParallelism Maximal parallelism that the job was initially created with. +* @param parallelismThe current parallelism under which the job runs. Must be <= maxParallelism. +* @param operatorIndex Id of a key-group. 0 <= keyGroupID < maxParallelism. +* @return +*/ + public static KeyGroupRange computeKeyGroupRangeForOperatorIndex( + int maxParallelism, + int parallelism, + int operatorIndex) { + Preconditions.checkArgument(parallelism > 0, "Parallelism must not be smaller than zero."); + Preconditions.checkArgument(maxParallelism >= parallelism, "Maximum parallelism must not be smaller than parallelism."); + Preconditions.checkArgument(maxParallelism <= (1 << 15), "Maximum parallelism must be smaller than 2^15."); + + int start = operatorIndex == 0 ? 0 : ((operatorIndex * maxParallelism - 1) / parallelism) + 1; + int end = ((operatorIndex + 1) * maxParallelism - 1) / parallelism; --- End diff -- I don't think that this is giving us the correct inverse for computeOperatorIndexForKeyGroup(...). Our test CheckpointCoordinatorTest::testCreateKeyGroupPartitions()generates counter-examples. > Introduce key groups for key-value state to support dynamic scaling > --- > > Key: FLINK-3755 > URL: https://issues.apache.org/jira/browse/FLINK-3755 > Project: Flink > Issue Type: New Feature >Affects Versions: 1.1.0 >Reporter:
[jira] [Created] (FLINK-4590) Some Table API tests are failing when debug lvl is set to DEBUG
Robert Metzger created FLINK-4590: - Summary: Some Table API tests are failing when debug lvl is set to DEBUG Key: FLINK-4590 URL: https://issues.apache.org/jira/browse/FLINK-4590 Project: Flink Issue Type: Bug Components: Table API & SQL Affects Versions: 1.2.0 Reporter: Robert Metzger For debugging another issue, I've set the log level on travis to DEBUG. After that, the Table API tests started failing {code} Failed tests: SetOperatorsITCase.testMinusAll:156 Internal error: Error occurred while applying rule DataSetScanRule SetOperatorsITCase.testMinusAll:156 Internal error: Error occurred while applying rule DataSetScanRule SetOperatorsITCase.testMinusAll:156 Internal error: Error occurred while applying rule DataSetScanRule SetOperatorsITCase.testMinusAll:156 Internal error: Error occurred while applying rule DataSetScanRule SetOperatorsITCase.testMinusDifferentFieldNames:204 Internal error: Error occurred while applying rule DataSetScanRule SetOperatorsITCase.testMinusDifferentFieldNames:204 Internal error: Error occurred while applying rule DataSetScanRule SetOperatorsITCase.testMinusDifferentFieldNames:204 Internal error: Error occurred while applying rule DataSetScanRule SetOperatorsITCase.testMinusDifferentFieldNames:204 Internal error: Error occurred while applying rule DataSetScanRule SetOperatorsITCase.testMinus:175 Internal error: Error occurred while applying rule DataSetScanRule SetOperatorsITCase.testMinus:175 Internal error: Error occurred while applying rule DataSetScanRule SetOperatorsITCase.testMinus:175 Internal error: Error occurred while applying rule DataSetScanRule SetOperatorsITCase.testMinus:175 Internal error: Error occurred while applying rule DataSetScanRule {code} Probably Calcite is executing additional assertions depending on the debug level. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-4596) Class not found exception when RESTART_STRATEGY is configured with fully qualified class name in the yaml
Nagarjun Guraja created FLINK-4596: -- Summary: Class not found exception when RESTART_STRATEGY is configured with fully qualified class name in the yaml Key: FLINK-4596 URL: https://issues.apache.org/jira/browse/FLINK-4596 Project: Flink Issue Type: Bug Reporter: Nagarjun Guraja CAUSE: createRestartStrategyFactory converts configured value of strategyname to lowercase and searches for class name using lowercased string. FIX: Do not lower case the strategy config value or just lowercase for the switch case alone -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4537) ResourceManager registration with JobManager
[ https://issues.apache.org/jira/browse/FLINK-4537?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15472525#comment-15472525 ] ASF GitHub Bot commented on FLINK-4537: --- GitHub user beyond1920 opened a pull request: https://github.com/apache/flink/pull/2479 [FLINK-4537] [cluster management] ResourceManager registration with JobManager This pull request is to implement ResourceManager registration with JobManager, which including: 1. Check whether input resourceManagerLeaderId is as same as the current leadershipSessionId of resourceManager. If not, it means that maybe two or more resourceManager exists at the same time, and current resourceManager is not the proper rm. so it rejects or ignores the registration. 2. Check whether exists a valid JobMaster at the giving address by connecting to the address. Reject the registration from invalid address.(Hidden in the connect logic) 3. Keep JobID and JobMasterGateway mapping relationships. 4. Start a JobMasterLeaderListener at the given JobID to listen to the leadership of the specified JobMaster. 5. Send registration successful ack to the jobMaster. Main difference are 6 points: 1. Add getJobMasterLeaderRetriever method to get job master leader retriever in HighAvailabilityServices, NonHaServices, A inner class in TaskExecutor, TestingHighAvailabilityServices. 2. Change registerJobMaster method logic of ResourceManager based on the above step 3. Change the input parameters of registerJobMaster method in ResourceManager and ResourceManagerGateway class to be consistent with registerTaskExecutor, from jobMasterRegistration to resourceManagerLeaderId + jobMasterAddress + jobID 4. Change the result type of registerJobMaster method in ResourceManager and ResourceManagerGateway class to be consistent with RetryingRegistration, from org.apache.flink.runtime.resourcemanager.RegistrationResponse to org.apache.flink.runtime.registration.RegistrationResponse 5. Add a LeaderRetrievalListener in ResourceManager to listen to leadership of jobMaster 6. Add a test class for registerJobMaster method in ResourceManager You can merge this pull request into a Git repository by running: $ git pull https://github.com/alibaba/flink jira-4537 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2479.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2479 commit fa66ac8ae86745dc9daf1fb07c6c96be4f336c90 Author: beyond1920Date: 2016-09-01T07:27:20Z rsourceManager registration with JobManager commit f5e54a21e4a864b5ac5f2f548b6d3dea3edcb619 Author: beyond1920 Date: 2016-09-07T09:53:44Z Add JobMasterLeaderRetriverListener at ResourceManager > ResourceManager registration with JobManager > > > Key: FLINK-4537 > URL: https://issues.apache.org/jira/browse/FLINK-4537 > Project: Flink > Issue Type: Sub-task > Components: Cluster Management >Reporter: Maximilian Michels >Assignee: zhangjing > > The ResourceManager keeps tracks of all JobManager's which execute Jobs. When > a new JobManager registered, its leadership status is checked through the > HighAvailabilityServices. It will then be registered at the ResourceManager > using the {{JobID}} provided with the initial registration message. > ResourceManager should use JobID and LeaderSessionID(notified by > HighAvailabilityServices) to identify a a session to JobMaster. > When JobManager's register at ResourceManager, it takes the following 2 input > parameters : > 1. resourceManagerLeaderId: the fencing token for the ResourceManager leader > which is kept by JobMaster who send the registration > 2. JobMasterRegistration: contain address, JobID > ResourceManager need to process the registration event based on the following > steps: > 1. Check whether input resourceManagerLeaderId is as same as the current > leadershipSessionId of resourceManager. If not, it means that maybe two or > more resourceManager exists at the same time, and current resourceManager is > not the proper rm. so it rejects or ignores the registration. > 2. Check whether exists a valid JobMaster at the giving address by connecting > to the address. Reject the registration from invalid address.(Hidden in the > connect logic) > 3. Keep JobID and JobMasterGateway mapping relationships. > 4. Start a JobMasterLeaderListener at the given JobID to listen to the > leadership of the specified JobMaster. > 5. Send registration successful ack to the jobMaster. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2479: [FLINK-4537] [cluster management] ResourceManager ...
GitHub user beyond1920 opened a pull request: https://github.com/apache/flink/pull/2479 [FLINK-4537] [cluster management] ResourceManager registration with JobManager This pull request is to implement ResourceManager registration with JobManager, which including: 1. Check whether input resourceManagerLeaderId is as same as the current leadershipSessionId of resourceManager. If not, it means that maybe two or more resourceManager exists at the same time, and current resourceManager is not the proper rm. so it rejects or ignores the registration. 2. Check whether exists a valid JobMaster at the giving address by connecting to the address. Reject the registration from invalid address.(Hidden in the connect logic) 3. Keep JobID and JobMasterGateway mapping relationships. 4. Start a JobMasterLeaderListener at the given JobID to listen to the leadership of the specified JobMaster. 5. Send registration successful ack to the jobMaster. Main difference are 6 points: 1. Add getJobMasterLeaderRetriever method to get job master leader retriever in HighAvailabilityServices, NonHaServices, A inner class in TaskExecutor, TestingHighAvailabilityServices. 2. Change registerJobMaster method logic of ResourceManager based on the above step 3. Change the input parameters of registerJobMaster method in ResourceManager and ResourceManagerGateway class to be consistent with registerTaskExecutor, from jobMasterRegistration to resourceManagerLeaderId + jobMasterAddress + jobID 4. Change the result type of registerJobMaster method in ResourceManager and ResourceManagerGateway class to be consistent with RetryingRegistration, from org.apache.flink.runtime.resourcemanager.RegistrationResponse to org.apache.flink.runtime.registration.RegistrationResponse 5. Add a LeaderRetrievalListener in ResourceManager to listen to leadership of jobMaster 6. Add a test class for registerJobMaster method in ResourceManager You can merge this pull request into a Git repository by running: $ git pull https://github.com/alibaba/flink jira-4537 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2479.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2479 commit fa66ac8ae86745dc9daf1fb07c6c96be4f336c90 Author: beyond1920Date: 2016-09-01T07:27:20Z rsourceManager registration with JobManager commit f5e54a21e4a864b5ac5f2f548b6d3dea3edcb619 Author: beyond1920 Date: 2016-09-07T09:53:44Z Add JobMasterLeaderRetriverListener at ResourceManager --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Assigned] (FLINK-4551) Heartbeat Manager integration with JobMaster
[ https://issues.apache.org/jira/browse/FLINK-4551?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] zhangjing reassigned FLINK-4551: Assignee: zhangjing > Heartbeat Manager integration with JobMaster > > > Key: FLINK-4551 > URL: https://issues.apache.org/jira/browse/FLINK-4551 > Project: Flink > Issue Type: Sub-task > Components: Cluster Management >Reporter: zhangjing >Assignee: zhangjing > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-4427) Implement container releasing logic (Standalone / Yarn / Mesos)
[ https://issues.apache.org/jira/browse/FLINK-4427?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kurt Young updated FLINK-4427: -- Summary: Implement container releasing logic (Standalone / Yarn / Mesos) (was: Add slot / Implement container releasing logic (Standalone / Yarn / Mesos)) > Implement container releasing logic (Standalone / Yarn / Mesos) > --- > > Key: FLINK-4427 > URL: https://issues.apache.org/jira/browse/FLINK-4427 > Project: Flink > Issue Type: Sub-task > Components: Cluster Management >Reporter: Kurt Young > > Currently we only have allocation logic for SlotManager / ResourceManager, > for some batch job, slots that already finished can be released, thus should > trigger container release in different cluster modes. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1526) Add Minimum Spanning Tree library method and example
[ https://issues.apache.org/jira/browse/FLINK-1526?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15472586#comment-15472586 ] Olga Golovneva commented on FLINK-1526: --- Hi, I was wondering what is the current state for this JIRA? I am really interested in implementing Boruvka's algorithm, I just wanted to check out if the for-loop iteration issue has been fixed by now. > Add Minimum Spanning Tree library method and example > > > Key: FLINK-1526 > URL: https://issues.apache.org/jira/browse/FLINK-1526 > Project: Flink > Issue Type: Task > Components: Gelly >Reporter: Vasia Kalavri > > This issue proposes the addition of a library method and an example for > distributed minimum spanning tree in Gelly. > The DMST algorithm is very interesting because it is quite different from > PageRank-like iterative graph algorithms. It consists of distinct phases > inside the same iteration and requires a mechanism to detect convergence of > one phase to proceed to the next one. Current implementations in > vertex-centric models are quite long (>1000 lines) and hard to understand. > You can find a description of the algorithm [here | > http://ilpubs.stanford.edu:8090/1077/3/p535-salihoglu.pdf] and [here | > http://www.vldb.org/pvldb/vol7/p1047-han.pdf]. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Comment Edited] (FLINK-1707) Add an Affinity Propagation Library Method
[ https://issues.apache.org/jira/browse/FLINK-1707?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15425389#comment-15425389 ] Josep Rubió edited comment on FLINK-1707 at 9/8/16 3:49 AM: Hi [~vkalavri], I changed the implementation, now: * The convergence condition changes to local modifications to the vertices that decide to be exemplars instead of no modifications in messages. This should remain the same for certain number of steps that are defined in the constructor * The values hash map is no longer needed * If a damping factor different to 0 is provided to the AP constructor, the oldValues HashMap will be populated and messages will be damped * If a damping factor of 0 is provided to the constructor the oldValues HashMap will not be created I will update the document Thanks! was (Author: joseprupi): Hi [~vkalavri], I changed the implementation now having these members for E vertices: private HashMapweights; private HashMap oldValues; private long exemplar; private int convergenceFactor; private int convergenceFactorCounter; where before it was: private HashMap weights; private HashMap values; private HashMap oldValues; private long exemplar; So: * The values hash map is no longer needed * If a damping factor different to 0 is provided to the AP constructor, the oldValues HashMap will be populated and the algorithm will work the same way it was before. * If a damping factor of 0 is provided to the constructor: -- The convergence condition changes to local modifications to the vertices that decide to be exemplars instead of no modifications in messages. This should remain the same for certain number of steps that are defined in the constructor -- There is no damping factor applied to messages -- The oldValues HashMap is not used Now I need to change the initialization of the graph, I will post the question to the dev group and see if someone can help me on doing it the best way. Once I finish it I will clean and comment the code, update the document and submit a new pull request. Thanks! > Add an Affinity Propagation Library Method > -- > > Key: FLINK-1707 > URL: https://issues.apache.org/jira/browse/FLINK-1707 > Project: Flink > Issue Type: New Feature > Components: Gelly >Reporter: Vasia Kalavri >Assignee: Josep Rubió >Priority: Minor > Labels: requires-design-doc > Attachments: Binary_Affinity_Propagation_in_Flink_design_doc.pdf > > > This issue proposes adding the an implementation of the Affinity Propagation > algorithm as a Gelly library method and a corresponding example. > The algorithm is described in paper [1] and a description of a vertex-centric > implementation can be found is [2]. > [1]: http://www.psi.toronto.edu/affinitypropagation/FreyDueckScience07.pdf > [2]: http://event.cwi.nl/grades2014/00-ching-slides.pdf > Design doc: > https://docs.google.com/document/d/1QULalzPqMVICi8jRVs3S0n39pell2ZVc7RNemz_SGA4/edit?usp=sharing > Example spreadsheet: > https://docs.google.com/spreadsheets/d/1CurZCBP6dPb1IYQQIgUHVjQdyLxK0JDGZwlSXCzBcvA/edit?usp=sharing -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4591) Select star does not work with grouping
[ https://issues.apache.org/jira/browse/FLINK-4591?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15472401#comment-15472401 ] Jark Wu commented on FLINK-4591: It seems that {{GROUP BY *}} is not allowed in SQL. Maybe we can throw a better exception to explain this when user use star in groupBy. Currently, the exception is "cannot resolve [*] given input [a, b, c]" which is not clear. > Select star does not work with grouping > --- > > Key: FLINK-4591 > URL: https://issues.apache.org/jira/browse/FLINK-4591 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Timo Walther > > It would be consistent if this would also work: > {{table.groupBy('*).select("*)}} > Currently, the star only works in a plain select without grouping. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2463: [FLINK-4538][FLINK-4348] ResourceManager slot allo...
Github user beyond1920 commented on a diff in the pull request: https://github.com/apache/flink/pull/2463#discussion_r77943543 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java --- @@ -52,15 +58,28 @@ * */ public class ResourceManager extends RpcEndpoint { - private final MapjobMasterGateways; + + private final Logger LOG = LoggerFactory.getLogger(getClass()); --- End diff -- There is a log field in RpcEndpoint, which is protected, why not use that instead? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4538) Implement slot allocation protocol with JobMaster
[ https://issues.apache.org/jira/browse/FLINK-4538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15472686#comment-15472686 ] ASF GitHub Bot commented on FLINK-4538: --- Github user beyond1920 commented on a diff in the pull request: https://github.com/apache/flink/pull/2463#discussion_r77943543 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java --- @@ -52,15 +58,28 @@ * */ public class ResourceManager extends RpcEndpoint { - private final MapjobMasterGateways; + + private final Logger LOG = LoggerFactory.getLogger(getClass()); --- End diff -- There is a log field in RpcEndpoint, which is protected, why not use that instead? > Implement slot allocation protocol with JobMaster > - > > Key: FLINK-4538 > URL: https://issues.apache.org/jira/browse/FLINK-4538 > Project: Flink > Issue Type: Sub-task > Components: Cluster Management >Reporter: Maximilian Michels >Assignee: Maximilian Michels > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4408) Submit Job and setup ExecutionGraph
[ https://issues.apache.org/jira/browse/FLINK-4408?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15472703#comment-15472703 ] ASF GitHub Bot commented on FLINK-4408: --- GitHub user KurtYoung opened a pull request: https://github.com/apache/flink/pull/2480 [FLINK-4408][JobManager] Introduce JobMasterRunner and implement job submission & setting up the ExecutionGraph Introduce JobMasterRunner to deal with job level leader election and make underlying job manager properly reacted. Also this runner takes care of determining whether job should be submitted with recover fashion. This PR also implement the job submission skeleton and setting up the ExecutionGraph, but the interactions with client has been marked as TODO since it may rely on something like JobClientGateway. I'd like to take care of that in a seperate PR later. The main procedure of managing the lifecycle of a job is: * Once we received a job submission request from the user, we create a JobMasterRunner to deal with it. * JobMasterRunner will first create leader election service to contend the leader of this job, once leadership is granted, it will try to do some real submission work. * Any error occurred during the submission phase will make this job as rejected and dropped. * Once job is accepted, we will face two levels on job retry: 1. Retarting upon execution failure, it's been taken care of RestartStrategy, and it's mainly __inside__ the ExecutionGraph 2. Loose of leadership, it will be handled by JobMasterRunner, the old ExecutionGraph will be suspended and then disposed. Retrying should re-submit the job to the JobMaster with isRecovery marked as true. You can merge this pull request into a Git repository by running: $ git pull https://github.com/KurtYoung/flink flink-4408 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2480.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2480 commit c5afeb1718f14b47739039b7f4695a791e2f1d20 Author: Kurt YoungDate: 2016-09-08T04:00:13Z [FLINK-4408][JobManager] Introduce JobMasterRunner and implement job submission & setting up the ExecutionGraph > Submit Job and setup ExecutionGraph > --- > > Key: FLINK-4408 > URL: https://issues.apache.org/jira/browse/FLINK-4408 > Project: Flink > Issue Type: Sub-task > Components: Cluster Management >Reporter: Xiaogang Shi >Assignee: Kurt Young > > Once granted the leadership, JM will start to execute the job. > Most code remains the same except that > (1) In old implementation where JM manages the execution of multiple jobs, JM > has to load all submitted JobGraphs from SubmittedJobGraphStore and recover > them. Now that the components creating JM will be responsible for the > recovery of JobGraphs, JM will be created with submitted/recovered JobGraph, > without the need to load the JobGraph. > (2) JM should not rely on Akka to listen on the updates of JobStatus and > Execution. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2480: [FLINK-4408][JobManager] Introduce JobMasterRunner...
GitHub user KurtYoung opened a pull request: https://github.com/apache/flink/pull/2480 [FLINK-4408][JobManager] Introduce JobMasterRunner and implement job submission & setting up the ExecutionGraph Introduce JobMasterRunner to deal with job level leader election and make underlying job manager properly reacted. Also this runner takes care of determining whether job should be submitted with recover fashion. This PR also implement the job submission skeleton and setting up the ExecutionGraph, but the interactions with client has been marked as TODO since it may rely on something like JobClientGateway. I'd like to take care of that in a seperate PR later. The main procedure of managing the lifecycle of a job is: * Once we received a job submission request from the user, we create a JobMasterRunner to deal with it. * JobMasterRunner will first create leader election service to contend the leader of this job, once leadership is granted, it will try to do some real submission work. * Any error occurred during the submission phase will make this job as rejected and dropped. * Once job is accepted, we will face two levels on job retry: 1. Retarting upon execution failure, it's been taken care of RestartStrategy, and it's mainly __inside__ the ExecutionGraph 2. Loose of leadership, it will be handled by JobMasterRunner, the old ExecutionGraph will be suspended and then disposed. Retrying should re-submit the job to the JobMaster with isRecovery marked as true. You can merge this pull request into a Git repository by running: $ git pull https://github.com/KurtYoung/flink flink-4408 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2480.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2480 commit c5afeb1718f14b47739039b7f4695a791e2f1d20 Author: Kurt YoungDate: 2016-09-08T04:00:13Z [FLINK-4408][JobManager] Introduce JobMasterRunner and implement job submission & setting up the ExecutionGraph --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3755) Introduce key groups for key-value state to support dynamic scaling
[ https://issues.apache.org/jira/browse/FLINK-3755?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15471057#comment-15471057 ] ASF GitHub Bot commented on FLINK-3755: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2440#discussion_r77856665 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupRangeAssignment.java --- @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state; + +import org.apache.flink.util.MathUtils; +import org.apache.flink.util.Preconditions; + +public final class KeyGroupRangeAssignment { + + public static final int DEFAULT_MAX_PARALLELISM = 128; + + private KeyGroupRangeAssignment() { + throw new AssertionError(); + } + + /** +* Assigns the given key to a parallel operator index. +* +* @param key the key to assign +* @param maxParallelism the maximum supported parallelism, aka the number of key-groups. +* @param parallelism the current parallelism of the operator +* @return the index of the parallel operator to which the given key should be routed. +*/ + public static int assignKeyToParallelOperator(Object key, int maxParallelism, int parallelism) { + return computeOperatorIndexForKeyGroup(maxParallelism, parallelism, assignToKeyGroup(key, maxParallelism)); + } + + /** +* Assigns the given key to a key-group index. +* +* @param key the key to assign +* @param maxParallelism the maximum supported parallelism, aka the number of key-groups. +* @return the key-group to which the given key is assigned +*/ + public static final int assignToKeyGroup(Object key, int maxParallelism) { + return MathUtils.murmurHash(key.hashCode()) % maxParallelism; + } + + /** +* Computes the range of key-groups that are assigned to a given operator under the given parallelism and maximum +* parallelism. +* +* IMPORTANT: maxParallelism must be <= Short.MAX_VALUE to avoid rounding problems in this method. If we ever want +* to go beyond this boundary, this method must perform arithmetic on long values. +* +* @param maxParallelism Maximal parallelism that the job was initially created with. +* @param parallelismThe current parallelism under which the job runs. Must be <= maxParallelism. +* @param operatorIndex Id of a key-group. 0 <= keyGroupID < maxParallelism. +* @return +*/ + public static KeyGroupRange computeKeyGroupRangeForOperatorIndex( + int maxParallelism, + int parallelism, + int operatorIndex) { + Preconditions.checkArgument(parallelism > 0, "Parallelism must not be smaller than zero."); + Preconditions.checkArgument(maxParallelism >= parallelism, "Maximum parallelism must not be smaller than parallelism."); + Preconditions.checkArgument(maxParallelism <= (1 << 15), "Maximum parallelism must be smaller than 2^15."); + + int start = operatorIndex == 0 ? 0 : ((operatorIndex * maxParallelism - 1) / parallelism) + 1; + int end = ((operatorIndex + 1) * maxParallelism - 1) / parallelism; --- End diff -- Can't we simplify this expression to ``` int start = (operatorIndex * maxParallelism) / parallelism; int end = ((operatorIndex + 1) * maxParallelism) / parallelism - 1; ``` > Introduce key groups for key-value state to support dynamic scaling > --- > > Key: FLINK-3755 > URL: https://issues.apache.org/jira/browse/FLINK-3755 > Project: Flink > Issue Type: New Feature >Affects Versions: 1.1.0 >Reporter: Till
[GitHub] flink issue #2440: [FLINK-3755] Introduce key groups for key-value state to ...
Github user StefanRRichter commented on the issue: https://github.com/apache/flink/pull/2440 I don't think that this is giving us the correct inverse for ```computeOperatorIndexForKeyGroup(...)```. Our test ```CheckpointCoordinatorTest::testCreateKeyGroupPartitions()```generates counter-examples. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4572) Convert to negative in LongValueToIntValue
[ https://issues.apache.org/jira/browse/FLINK-4572?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15471566#comment-15471566 ] ASF GitHub Bot commented on FLINK-4572: --- Github user greghogan commented on the issue: https://github.com/apache/flink/pull/2469 Do you have a second use case in mind for adding this function to `MathUtils`? My thought would be to keep this separate to avoid confusion between signed and unsigned downcasts. > Convert to negative in LongValueToIntValue > -- > > Key: FLINK-4572 > URL: https://issues.apache.org/jira/browse/FLINK-4572 > Project: Flink > Issue Type: Bug > Components: Gelly >Affects Versions: 1.2.0 >Reporter: Greg Hogan >Assignee: Greg Hogan >Priority: Minor > > The Gelly drivers expect that scale 32 edges, represented by the lower 32 > bits of {{long}} values, can be converted to {{int}} values. Values between > 2^31 and 2^32 - 1 should be converted to negative integers, which is not > supported by {{MathUtils.checkedDownCast}}. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2469: [FLINK-4572] [gelly] Convert to negative in LongValueToIn...
Github user greghogan commented on the issue: https://github.com/apache/flink/pull/2469 Do you have a second use case in mind for adding this function to `MathUtils`? My thought would be to keep this separate to avoid confusion between signed and unsigned downcasts. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Created] (FLINK-4594) Validate lower bound in MathUtils.checkedDownCast
Greg Hogan created FLINK-4594: - Summary: Validate lower bound in MathUtils.checkedDownCast Key: FLINK-4594 URL: https://issues.apache.org/jira/browse/FLINK-4594 Project: Flink Issue Type: Bug Components: Core Affects Versions: 1.2.0 Reporter: Greg Hogan Assignee: Greg Hogan Priority: Trivial {{MathUtils.checkedDownCast}} only compares against the upper bound {{Integer.MAX_VALUE}}, which has worked with current usage. Rather than adding a second comparison we can replace {noformat} if (value > Integer.MAX_VALUE) { {noformat} with a cast and check {noformat} if ((int)value != value) { ... {noformat} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2469: [FLINK-4572] [gelly] Convert to negative in LongValueToIn...
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/2469 Looks good. Does it make sense to add to extend the `MathUtils.checkedDownCast(...)` function, or add a `MathUtils.checkedSignedDownCast(...)` function? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4572) Convert to negative in LongValueToIntValue
[ https://issues.apache.org/jira/browse/FLINK-4572?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15471484#comment-15471484 ] ASF GitHub Bot commented on FLINK-4572: --- Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/2469 Looks good. Does it make sense to add to extend the `MathUtils.checkedDownCast(...)` function, or add a `MathUtils.checkedSignedDownCast(...)` function? > Convert to negative in LongValueToIntValue > -- > > Key: FLINK-4572 > URL: https://issues.apache.org/jira/browse/FLINK-4572 > Project: Flink > Issue Type: Bug > Components: Gelly >Affects Versions: 1.2.0 >Reporter: Greg Hogan >Assignee: Greg Hogan >Priority: Minor > > The Gelly drivers expect that scale 32 edges, represented by the lower 32 > bits of {{long}} values, can be converted to {{int}} values. Values between > 2^31 and 2^32 - 1 should be converted to negative integers, which is not > supported by {{MathUtils.checkedDownCast}}. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-4593) Fix PageRank algorithm example
[ https://issues.apache.org/jira/browse/FLINK-4593?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Alexander Pivovarov updated FLINK-4593: --- Flags: Patch > Fix PageRank algorithm example > -- > > Key: FLINK-4593 > URL: https://issues.apache.org/jira/browse/FLINK-4593 > Project: Flink > Issue Type: Bug > Components: Project Website >Reporter: Alexander Pivovarov >Priority: Minor > > This page https://flink.apache.org/features.html shows the code which > implements PageRank algorithm (Batch Processing Applications). > I noticed couple bugs in the code > Page class has pageId field > Adjacency has just id > but in the code I see > {code}pages.join(adjacency).where("pageId").equalTo("pageId"){code} > {code}Page(page.id, 0.15 / numPages){code} > Also, the code in not formatted (missing spaces) > {code}Page(n, 0.85*page.rank/adj.neighbors.length){code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2440: [FLINK-3755] Introduce key groups for key-value st...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/2440#discussion_r77870173 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupRangeAssignment.java --- @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state; + +import org.apache.flink.util.MathUtils; +import org.apache.flink.util.Preconditions; + +public final class KeyGroupRangeAssignment { + + public static final int DEFAULT_MAX_PARALLELISM = 128; + + private KeyGroupRangeAssignment() { + throw new AssertionError(); + } + + /** +* Assigns the given key to a parallel operator index. +* +* @param key the key to assign +* @param maxParallelism the maximum supported parallelism, aka the number of key-groups. +* @param parallelism the current parallelism of the operator +* @return the index of the parallel operator to which the given key should be routed. +*/ + public static int assignKeyToParallelOperator(Object key, int maxParallelism, int parallelism) { + return computeOperatorIndexForKeyGroup(maxParallelism, parallelism, assignToKeyGroup(key, maxParallelism)); + } + + /** +* Assigns the given key to a key-group index. +* +* @param key the key to assign +* @param maxParallelism the maximum supported parallelism, aka the number of key-groups. +* @return the key-group to which the given key is assigned +*/ + public static final int assignToKeyGroup(Object key, int maxParallelism) { + return MathUtils.murmurHash(key.hashCode()) % maxParallelism; + } + + /** +* Computes the range of key-groups that are assigned to a given operator under the given parallelism and maximum +* parallelism. +* +* IMPORTANT: maxParallelism must be <= Short.MAX_VALUE to avoid rounding problems in this method. If we ever want +* to go beyond this boundary, this method must perform arithmetic on long values. +* +* @param maxParallelism Maximal parallelism that the job was initially created with. +* @param parallelismThe current parallelism under which the job runs. Must be <= maxParallelism. +* @param operatorIndex Id of a key-group. 0 <= keyGroupID < maxParallelism. +* @return +*/ + public static KeyGroupRange computeKeyGroupRangeForOperatorIndex( + int maxParallelism, + int parallelism, + int operatorIndex) { + Preconditions.checkArgument(parallelism > 0, "Parallelism must not be smaller than zero."); + Preconditions.checkArgument(maxParallelism >= parallelism, "Maximum parallelism must not be smaller than parallelism."); + Preconditions.checkArgument(maxParallelism <= (1 << 15), "Maximum parallelism must be smaller than 2^15."); + + int start = operatorIndex == 0 ? 0 : ((operatorIndex * maxParallelism - 1) / parallelism) + 1; + int end = ((operatorIndex + 1) * maxParallelism - 1) / parallelism; --- End diff -- I don't think that this is giving us the correct inverse for computeOperatorIndexForKeyGroup(...). Our test CheckpointCoordinatorTest::testCreateKeyGroupPartitions()generates counter-examples. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3755) Introduce key groups for key-value state to support dynamic scaling
[ https://issues.apache.org/jira/browse/FLINK-3755?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15471283#comment-15471283 ] ASF GitHub Bot commented on FLINK-3755: --- Github user StefanRRichter commented on the issue: https://github.com/apache/flink/pull/2440 I don't think that this is giving us the correct inverse for ```computeOperatorIndexForKeyGroup(...)```. Our test ```CheckpointCoordinatorTest::testCreateKeyGroupPartitions()```generates counter-examples. > Introduce key groups for key-value state to support dynamic scaling > --- > > Key: FLINK-3755 > URL: https://issues.apache.org/jira/browse/FLINK-3755 > Project: Flink > Issue Type: New Feature >Affects Versions: 1.1.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann > Fix For: 1.2.0 > > > In order to support dynamic scaling, it is necessary to sub-partition the > key-value states of each operator. This sub-partitioning, which produces a > set of key groups, allows to easily scale in and out Flink jobs by simply > reassigning the different key groups to the new set of sub tasks. The idea of > key groups is described in this design document [1]. > [1] > https://docs.google.com/document/d/1G1OS1z3xEBOrYD4wSu-LuBCyPUWyFd9l3T9WyssQ63w/edit?usp=sharing -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-4585) Fix broken links in index.md
[ https://issues.apache.org/jira/browse/FLINK-4585?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Alexander Pivovarov updated FLINK-4585: --- Flags: Patch > Fix broken links in index.md > > > Key: FLINK-4585 > URL: https://issues.apache.org/jira/browse/FLINK-4585 > Project: Flink > Issue Type: Bug > Components: Project Website >Reporter: Alexander Pivovarov >Priority: Minor > > The following links are broken > DataSet API > link: > http://ci.apache.org/projects/flink/flink-docs-master/apis/batch/index.html > correct link: > https://ci.apache.org/projects/flink/flink-docs-master/dev/batch/index.html > Table API > link: > http://ci.apache.org/projects/flink/flink-docs-master/apis/batch/libs/table.html > correct link: > https://ci.apache.org/projects/flink/flink-docs-master/dev/table_api.html > Gelly > link: > http://ci.apache.org/projects/flink/flink-docs-master/apis/batch/libs/gelly.html > correct link: > https://ci.apache.org/projects/flink/flink-docs-master/dev/libs/gelly/index.html > The following links show "Page 'X' Has Moved to" for 1-2 sec and then > redirect to another page > DataStream API > link: > http://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/index.html > redirects-to: > https://ci.apache.org/projects/flink/flink-docs-master/dev/datastream_api.html > programming guide > link: > http://ci.apache.org/projects/flink/flink-docs-master/apis/programming_guide.html > redirects-to DataSet API: > https://ci.apache.org/projects/flink/flink-docs-master/dev/batch/index.html > probably it should be "Basic API Concepts" > https://ci.apache.org/projects/flink/flink-docs-master/dev/api_concepts.html > or Quick Start - > https://ci.apache.org/projects/flink/flink-docs-master/quickstart/setup_quickstart.html > CEP > link: > http://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/libs/cep.html > redirects-to: > https://ci.apache.org/projects/flink/flink-docs-master/dev/libs/cep.html > ML > link: > http://ci.apache.org/projects/flink/flink-docs-master/apis/batch/libs/ml/index.html > redirects-to: > https://ci.apache.org/projects/flink/flink-docs-master/dev/libs/ml/index.html -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-4593) Fix PageRank algorithm example
Alexander Pivovarov created FLINK-4593: -- Summary: Fix PageRank algorithm example Key: FLINK-4593 URL: https://issues.apache.org/jira/browse/FLINK-4593 Project: Flink Issue Type: Bug Components: Project Website Reporter: Alexander Pivovarov Priority: Minor This page https://flink.apache.org/features.html shows the code which implements PageRank algorithm (Batch Processing Applications). I noticed couple bugs in the code Page class has pageId field Adjacency has just id but in the code I see {code}pages.join(adjacency).where("pageId").equalTo("pageId"){code} Also {code}Page(page.id, 0.15 / numPages){code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-4593) Fix PageRank algorithm example
[ https://issues.apache.org/jira/browse/FLINK-4593?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Alexander Pivovarov updated FLINK-4593: --- Description: This page https://flink.apache.org/features.html shows the code which implements PageRank algorithm (Batch Processing Applications). I noticed couple bugs in the code Page class has pageId field Adjacency has just id but in the code I see {code}pages.join(adjacency).where("pageId").equalTo("pageId"){code} {code}Page(page.id, 0.15 / numPages){code} Also, the code in not formatted (missing spaces) {code}Page(n, 0.85*page.rank/adj.neighbors.length){code} was: This page https://flink.apache.org/features.html shows the code which implements PageRank algorithm (Batch Processing Applications). I noticed couple bugs in the code Page class has pageId field Adjacency has just id but in the code I see {code}pages.join(adjacency).where("pageId").equalTo("pageId"){code} Also {code}Page(page.id, 0.15 / numPages){code} > Fix PageRank algorithm example > -- > > Key: FLINK-4593 > URL: https://issues.apache.org/jira/browse/FLINK-4593 > Project: Flink > Issue Type: Bug > Components: Project Website >Reporter: Alexander Pivovarov >Priority: Minor > > This page https://flink.apache.org/features.html shows the code which > implements PageRank algorithm (Batch Processing Applications). > I noticed couple bugs in the code > Page class has pageId field > Adjacency has just id > but in the code I see > {code}pages.join(adjacency).where("pageId").equalTo("pageId"){code} > {code}Page(page.id, 0.15 / numPages){code} > Also, the code in not formatted (missing spaces) > {code}Page(n, 0.85*page.rank/adj.neighbors.length){code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2463: [FLINK-4538][FLINK-4348] ResourceManager slot allo...
Github user beyond1920 commented on a diff in the pull request: https://github.com/apache/flink/pull/2463#discussion_r77768256 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java --- @@ -89,32 +93,37 @@ public SlotManager(ResourceManagerGateway resourceManagerGateway) { * RPC's main thread to avoid race condition). * * @param request The detailed request of the slot +* @return SlotRequestRegistered The confirmation message to be send to the caller */ - public void requestSlot(final SlotRequest request) { + public SlotRequestRegistered requestSlot(final SlotRequest request) { + final AllocationID allocationId = request.getAllocationId(); if (isRequestDuplicated(request)) { - LOG.warn("Duplicated slot request, AllocationID:{}", request.getAllocationId()); - return; + LOG.warn("Duplicated slot request, AllocationID:{}", allocationId); + return null; } // try to fulfil the request with current free slots - ResourceSlot slot = chooseSlotToUse(request, freeSlots); + final ResourceSlot slot = chooseSlotToUse(request, freeSlots); if (slot != null) { LOG.info("Assigning SlotID({}) to AllocationID({}), JobID:{}", slot.getSlotId(), - request.getAllocationId(), request.getJobId()); + allocationId, request.getJobId()); // record this allocation in bookkeeping - allocationMap.addAllocation(slot.getSlotId(), request.getAllocationId()); + allocationMap.addAllocation(slot.getSlotId(), allocationId); // remove selected slot from free pool freeSlots.remove(slot.getSlotId()); - // TODO: send slot request to TaskManager + slot.getTaskExecutorGateway() + .requestSlot(allocationId, leaderIdRegistry.getLeaderID()); --- End diff -- ResourceManager keeps a relationship between resourceID and TaskExecutorGateway. Maybe we could fetch TaskExecutorGateway by resourceID using ResourceManager here? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4538) Implement slot allocation protocol with JobMaster
[ https://issues.apache.org/jira/browse/FLINK-4538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15469776#comment-15469776 ] ASF GitHub Bot commented on FLINK-4538: --- Github user beyond1920 commented on a diff in the pull request: https://github.com/apache/flink/pull/2463#discussion_r77768256 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java --- @@ -89,32 +93,37 @@ public SlotManager(ResourceManagerGateway resourceManagerGateway) { * RPC's main thread to avoid race condition). * * @param request The detailed request of the slot +* @return SlotRequestRegistered The confirmation message to be send to the caller */ - public void requestSlot(final SlotRequest request) { + public SlotRequestRegistered requestSlot(final SlotRequest request) { + final AllocationID allocationId = request.getAllocationId(); if (isRequestDuplicated(request)) { - LOG.warn("Duplicated slot request, AllocationID:{}", request.getAllocationId()); - return; + LOG.warn("Duplicated slot request, AllocationID:{}", allocationId); + return null; } // try to fulfil the request with current free slots - ResourceSlot slot = chooseSlotToUse(request, freeSlots); + final ResourceSlot slot = chooseSlotToUse(request, freeSlots); if (slot != null) { LOG.info("Assigning SlotID({}) to AllocationID({}), JobID:{}", slot.getSlotId(), - request.getAllocationId(), request.getJobId()); + allocationId, request.getJobId()); // record this allocation in bookkeeping - allocationMap.addAllocation(slot.getSlotId(), request.getAllocationId()); + allocationMap.addAllocation(slot.getSlotId(), allocationId); // remove selected slot from free pool freeSlots.remove(slot.getSlotId()); - // TODO: send slot request to TaskManager + slot.getTaskExecutorGateway() + .requestSlot(allocationId, leaderIdRegistry.getLeaderID()); --- End diff -- ResourceManager keeps a relationship between resourceID and TaskExecutorGateway. Maybe we could fetch TaskExecutorGateway by resourceID using ResourceManager here? > Implement slot allocation protocol with JobMaster > - > > Key: FLINK-4538 > URL: https://issues.apache.org/jira/browse/FLINK-4538 > Project: Flink > Issue Type: Sub-task > Components: Cluster Management >Reporter: Maximilian Michels >Assignee: Maximilian Michels > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4565) Support for SQL IN operator
[ https://issues.apache.org/jira/browse/FLINK-4565?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15469797#comment-15469797 ] Simone Robutti commented on FLINK-4565: --- Actually the confusing part is that the translation to the execution plan should already exist but I can't find where it happens. > Support for SQL IN operator > --- > > Key: FLINK-4565 > URL: https://issues.apache.org/jira/browse/FLINK-4565 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Timo Walther > > It seems that Flink SQL supports the uncorrelated sub-query IN operator. But > it should also be available in the Table API and tested. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2463: [FLINK-4538][FLINK-4348] ResourceManager slot allo...
Github user beyond1920 commented on a diff in the pull request: https://github.com/apache/flink/pull/2463#discussion_r77769044 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java --- @@ -89,32 +93,37 @@ public SlotManager(ResourceManagerGateway resourceManagerGateway) { * RPC's main thread to avoid race condition). * * @param request The detailed request of the slot +* @return SlotRequestRegistered The confirmation message to be send to the caller */ - public void requestSlot(final SlotRequest request) { + public SlotRequestRegistered requestSlot(final SlotRequest request) { + final AllocationID allocationId = request.getAllocationId(); if (isRequestDuplicated(request)) { - LOG.warn("Duplicated slot request, AllocationID:{}", request.getAllocationId()); - return; + LOG.warn("Duplicated slot request, AllocationID:{}", allocationId); + return null; } // try to fulfil the request with current free slots - ResourceSlot slot = chooseSlotToUse(request, freeSlots); + final ResourceSlot slot = chooseSlotToUse(request, freeSlots); if (slot != null) { LOG.info("Assigning SlotID({}) to AllocationID({}), JobID:{}", slot.getSlotId(), - request.getAllocationId(), request.getJobId()); + allocationId, request.getJobId()); // record this allocation in bookkeeping - allocationMap.addAllocation(slot.getSlotId(), request.getAllocationId()); + allocationMap.addAllocation(slot.getSlotId(), allocationId); // remove selected slot from free pool freeSlots.remove(slot.getSlotId()); - // TODO: send slot request to TaskManager + slot.getTaskExecutorGateway() + .requestSlot(allocationId, leaderIdRegistry.getLeaderID()); --- End diff -- There exists 3 following possibilities of the response from taskExecutor: 1. Ack request which means the taskExecutor gives the slot to the specified jobMaster as expected. 2. Decline request if the slot is already occupied by other AllocationID. 3. Timeout which could caused by lost of request message or response message or slow network transfer. On the first occasion, SlotManager need to do nothing. However, under the second and third occasion, slotManager will verify and clear all the previous allocate information for this slot request firstly, then try to find a proper slot for the slot request again. I thought we should add logic to handle these 3 following possibilities of the response from taskExecutor. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4538) Implement slot allocation protocol with JobMaster
[ https://issues.apache.org/jira/browse/FLINK-4538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15469801#comment-15469801 ] ASF GitHub Bot commented on FLINK-4538: --- Github user beyond1920 commented on a diff in the pull request: https://github.com/apache/flink/pull/2463#discussion_r77769044 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java --- @@ -89,32 +93,37 @@ public SlotManager(ResourceManagerGateway resourceManagerGateway) { * RPC's main thread to avoid race condition). * * @param request The detailed request of the slot +* @return SlotRequestRegistered The confirmation message to be send to the caller */ - public void requestSlot(final SlotRequest request) { + public SlotRequestRegistered requestSlot(final SlotRequest request) { + final AllocationID allocationId = request.getAllocationId(); if (isRequestDuplicated(request)) { - LOG.warn("Duplicated slot request, AllocationID:{}", request.getAllocationId()); - return; + LOG.warn("Duplicated slot request, AllocationID:{}", allocationId); + return null; } // try to fulfil the request with current free slots - ResourceSlot slot = chooseSlotToUse(request, freeSlots); + final ResourceSlot slot = chooseSlotToUse(request, freeSlots); if (slot != null) { LOG.info("Assigning SlotID({}) to AllocationID({}), JobID:{}", slot.getSlotId(), - request.getAllocationId(), request.getJobId()); + allocationId, request.getJobId()); // record this allocation in bookkeeping - allocationMap.addAllocation(slot.getSlotId(), request.getAllocationId()); + allocationMap.addAllocation(slot.getSlotId(), allocationId); // remove selected slot from free pool freeSlots.remove(slot.getSlotId()); - // TODO: send slot request to TaskManager + slot.getTaskExecutorGateway() + .requestSlot(allocationId, leaderIdRegistry.getLeaderID()); --- End diff -- There exists 3 following possibilities of the response from taskExecutor: 1. Ack request which means the taskExecutor gives the slot to the specified jobMaster as expected. 2. Decline request if the slot is already occupied by other AllocationID. 3. Timeout which could caused by lost of request message or response message or slow network transfer. On the first occasion, SlotManager need to do nothing. However, under the second and third occasion, slotManager will verify and clear all the previous allocate information for this slot request firstly, then try to find a proper slot for the slot request again. I thought we should add logic to handle these 3 following possibilities of the response from taskExecutor. > Implement slot allocation protocol with JobMaster > - > > Key: FLINK-4538 > URL: https://issues.apache.org/jira/browse/FLINK-4538 > Project: Flink > Issue Type: Sub-task > Components: Cluster Management >Reporter: Maximilian Michels >Assignee: Maximilian Michels > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4565) Support for SQL IN operator
[ https://issues.apache.org/jira/browse/FLINK-4565?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15469836#comment-15469836 ] Fabian Hueske commented on FLINK-4565: -- SQL {{IN}} does not require a special implementation because it can be translated into an inner join (as long as the subquery is not correlated with the outer query). We use Calcite to convert the subquery into a join ({{RelDecorrelator.decorrelateQuery(relNode)}}) in {{org.apache.flink.api.table.BatchTableEnvironment}} line 246. After the conversion, IN is represented and executed as join. I think the integration with the Table API can happen in two ways: 1. Generate a RelNode plan with a subquery and let Calcite do the decorrelation before the query is optimized (same approach as SQL) 2. Generate a RelNode plan with a Join from the beginning. I would try to go for the second approach first. > Support for SQL IN operator > --- > > Key: FLINK-4565 > URL: https://issues.apache.org/jira/browse/FLINK-4565 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Timo Walther > > It seems that Flink SQL supports the uncorrelated sub-query IN operator. But > it should also be available in the Table API and tested. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Assigned] (FLINK-4565) Support for SQL IN operator
[ https://issues.apache.org/jira/browse/FLINK-4565?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Simone Robutti reassigned FLINK-4565: - Assignee: Simone Robutti > Support for SQL IN operator > --- > > Key: FLINK-4565 > URL: https://issues.apache.org/jira/browse/FLINK-4565 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Simone Robutti > > It seems that Flink SQL supports the uncorrelated sub-query IN operator. But > it should also be available in the Table API and tested. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4565) Support for SQL IN operator
[ https://issues.apache.org/jira/browse/FLINK-4565?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15469851#comment-15469851 ] Simone Robutti commented on FLINK-4565: --- I imagined it could happen that way and I was going to open calcite source to look for that. You saved me a lot of time. I will go for the second approach, considering that it looks a lot like what happens for all the others operators I've seen. > Support for SQL IN operator > --- > > Key: FLINK-4565 > URL: https://issues.apache.org/jira/browse/FLINK-4565 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Simone Robutti > > It seems that Flink SQL supports the uncorrelated sub-query IN operator. But > it should also be available in the Table API and tested. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4565) Support for SQL IN operator
[ https://issues.apache.org/jira/browse/FLINK-4565?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15469873#comment-15469873 ] Timo Walther commented on FLINK-4565: - Calcite translates the IN operator in {{org.apache.calcite.sql2rel.SqlToRelConverter#convertExpression}}. Calcite translates this into an Aggregate and Join. After fixing some issue in "DataSetAggregate" we can execute: {{"SELECT WordCount.word FROM WordCount WHERE WordCount.word IN (SELECT WordCount1.word AS w FROM WordCount1)"}}. The plan looks like: {code} == Physical Execution Plan == Stage 4 : Data Source content : collect elements with CollectionInputFormat Partitioning : RANDOM_PARTITIONED Stage 3 : Map content : from: (word, frequency) ship_strategy : Forward exchange_mode : BATCH driver_strategy : Map Partitioning : RANDOM_PARTITIONED Stage 8 : Map content : from: (word, frequency) ship_strategy : Forward exchange_mode : BATCH driver_strategy : Map Partitioning : RANDOM_PARTITIONED Stage 7 : Map content : prepare select: (word) ship_strategy : Forward exchange_mode : PIPELINED driver_strategy : Map Partitioning : RANDOM_PARTITIONED Stage 6 : GroupCombine content : groupBy: (word), select:(word) ship_strategy : Forward exchange_mode : PIPELINED driver_strategy : Sorted Combine Partitioning : RANDOM_PARTITIONED Stage 5 : GroupReduce content : groupBy: (word), select:(word) ship_strategy : Hash Partition on [0] exchange_mode : PIPELINED driver_strategy : Sorted Group Reduce Partitioning : RANDOM_PARTITIONED Stage 2 : Join content : where: (=(word, w)), join: (word, frequency, w) ship_strategy : Hash Partition on [0] exchange_mode : PIPELINED driver_strategy : Hybrid Hash (build: from: (word, frequency) (id: 3)) Partitioning : RANDOM_PARTITIONED Stage 1 : FlatMap content : select: (word) ship_strategy : Forward exchange_mode : PIPELINED driver_strategy : FlatMap Partitioning : RANDOM_PARTITIONED Stage 0 : Data Sink content : org.apache.flink.api.java.io.DiscardingOutputFormat ship_strategy : Forward exchange_mode : PIPELINED Partitioning : RANDOM_PARTITIONED {code} > Support for SQL IN operator > --- > > Key: FLINK-4565 > URL: https://issues.apache.org/jira/browse/FLINK-4565 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Simone Robutti > > It seems that Flink SQL supports the uncorrelated sub-query IN operator. But > it should also be available in the Table API and tested. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4565) Support for SQL IN operator
[ https://issues.apache.org/jira/browse/FLINK-4565?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15469879#comment-15469879 ] Timo Walther commented on FLINK-4565: - It seems my answer was to late. Yes, I would also go for the second approach. > Support for SQL IN operator > --- > > Key: FLINK-4565 > URL: https://issues.apache.org/jira/browse/FLINK-4565 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Simone Robutti > > It seems that Flink SQL supports the uncorrelated sub-query IN operator. But > it should also be available in the Table API and tested. -- This message was sent by Atlassian JIRA (v6.3.4#6332)