[jira] [Updated] (FLINK-5903) taskmanager.numberOfTaskSlots and yarn.containers.vcores did not work well in YARN mode
[ https://issues.apache.org/jira/browse/FLINK-5903?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tao Wang updated FLINK-5903: Attachment: set yarn.container.vcores to 5_RM.JPG set yarn.container.vcores to 5_JM.JPG set taskmanager.numberOfTaskSlots to 6.JPG > taskmanager.numberOfTaskSlots and yarn.containers.vcores did not work well in > YARN mode > --- > > Key: FLINK-5903 > URL: https://issues.apache.org/jira/browse/FLINK-5903 > Project: Flink > Issue Type: Bug > Components: YARN >Reporter: Tao Wang > Attachments: set taskmanager.numberOfTaskSlots to 6.JPG, set > yarn.container.vcores to 5_JM.JPG, set yarn.container.vcores to 5_RM.JPG > > > Now Flink did not respect taskmanager.numberOfTaskSlots and > yarn.containers.vcores in flink-conf.yaml, but only -s parameter in CLI. > Details is that taskmanager.numberOfTaskSlots is not working in anyway > andyarn.containers.vcores is only used in requesting container(TM) resources > but not aware to TM, which means TM will always think it has default(1) Slots > if -s is not configured. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5903) taskmanager.numberOfTaskSlots and yarn.containers.vcores did not work well in YARN mode
[ https://issues.apache.org/jira/browse/FLINK-5903?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15882126#comment-15882126 ] Tao Wang commented on FLINK-5903: - I've located the reason and fix it ASAP. > taskmanager.numberOfTaskSlots and yarn.containers.vcores did not work well in > YARN mode > --- > > Key: FLINK-5903 > URL: https://issues.apache.org/jira/browse/FLINK-5903 > Project: Flink > Issue Type: Bug > Components: YARN >Reporter: Tao Wang > > Now Flink did not respect taskmanager.numberOfTaskSlots and > yarn.containers.vcores in flink-conf.yaml, but only -s parameter in CLI. > Details is that taskmanager.numberOfTaskSlots is not working in anyway > andyarn.containers.vcores is only used in requesting container(TM) resources > but not aware to TM, which means TM will always think it has default(1) Slots > if -s is not configured. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (FLINK-5903) taskmanager.numberOfTaskSlots and yarn.containers.vcores did not work well in YARN mode
Tao Wang created FLINK-5903: --- Summary: taskmanager.numberOfTaskSlots and yarn.containers.vcores did not work well in YARN mode Key: FLINK-5903 URL: https://issues.apache.org/jira/browse/FLINK-5903 Project: Flink Issue Type: Bug Components: YARN Reporter: Tao Wang Now Flink did not respect taskmanager.numberOfTaskSlots and yarn.containers.vcores in flink-conf.yaml, but only -s parameter in CLI. Details is that taskmanager.numberOfTaskSlots is not working in anyway andyarn.containers.vcores is only used in requesting container(TM) resources but not aware to TM, which means TM will always think it has default(1) Slots if -s is not configured. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #3406: [flink-5568] [Table API & SQL]Introduce interface ...
GitHub user beyond1920 opened a pull request: https://github.com/apache/flink/pull/3406 [flink-5568] [Table API & SQL]Introduce interface for catalog, and provide an in-memory implementation. Integrate external catalog with calcite catalog This pr aims to introduce interface for catalog, and provide an in-memory implementation for test and develop, finally integrate external catalog with calcite catalog. The main change including: 1. Introduce ExternalCatalog abstraction, including introduce ExternalCatalogDatabase as database in catalog and ExternalCatalogTable as table in catalog. 2. Provide an in-memory implementation for test and develop. 3. Introduce ExternalCatalogSchema which is an implementation of Calcite Schema interface. It registers database in ExternalCatalog as calcite Schemas, and tables in a database as Calcite table. 4. Add ExternalCatalogCompatible annotation. The TableSource with this annotation represents it could be converted to or from externalCatalogTable. ExternalCatalogTableConverter is the converter between externalCatalogTable and tableSource. 5. Introduce CatalogTableHelper utility. It has two responsibilities: * automatically find the TableSources which are with ExternalCatalogCompatible annotation. * convert an ExternalCatalogTable instance to a TableSourceTable instance. You can merge this pull request into a Git repository by running: $ git pull https://github.com/alibaba/flink dev Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3406.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3406 commit d0e1ab20078adc4f788e9c2d2c167f0251ae3476 Author: jingzhangDate: 2017-02-22T11:28:08Z Introduce interface for external catalog, and provide an in-memory implementation for test or develop. Integrate with calcite catalog. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Updated] (FLINK-5570) Support register external catalog to table environment
[ https://issues.apache.org/jira/browse/FLINK-5570?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] jingzhang updated FLINK-5570: - Description: This issue aims to support register one or more {{ExternalCatalog}} (which is referred in https://issues.apache.org/jira/browse/FLINK-5568) to {{TableEnvironment}}. After registration, SQL and TableAPI queries could access to tables in the external catalogs without register those tables one by one to {{TableEnvironment}} beforehand. We plan to add two APIs in {{TableEnvironment}}: 1. register externalCatalog {code} def registerExternalCatalog(name: String, externalCatalog: ExternalCatalog): Unit {code} 2. scan a table from registered catalog and returns the resulting {{Table}}, the API is very useful in TableAPI queries. {code} def scan(catalogName: String, tableIdentifier: TableIdentifier): Table {code} > Support register external catalog to table environment > -- > > Key: FLINK-5570 > URL: https://issues.apache.org/jira/browse/FLINK-5570 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Kurt Young >Assignee: jingzhang > > This issue aims to support register one or more {{ExternalCatalog}} (which is > referred in https://issues.apache.org/jira/browse/FLINK-5568) to > {{TableEnvironment}}. After registration, SQL and TableAPI queries could > access to tables in the external catalogs without register those tables one > by one to {{TableEnvironment}} beforehand. > We plan to add two APIs in {{TableEnvironment}}: > 1. register externalCatalog > {code} > def registerExternalCatalog(name: String, externalCatalog: ExternalCatalog): > Unit > {code} > 2. scan a table from registered catalog and returns the resulting {{Table}}, > the API is very useful in TableAPI queries. > {code} > def scan(catalogName: String, tableIdentifier: TableIdentifier): Table > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Updated] (FLINK-5902) Some images can not show in IE
[ https://issues.apache.org/jira/browse/FLINK-5902?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tao Wang updated FLINK-5902: Attachment: chrome is ok.png > Some images can not show in IE > -- > > Key: FLINK-5902 > URL: https://issues.apache.org/jira/browse/FLINK-5902 > Project: Flink > Issue Type: Bug > Components: Webfrontend > Environment: IE >Reporter: Tao Wang > Attachments: chrome is ok.png, IE 11 with problem.png > > > Some images in the Overview page can not show in IE, as it is good in Chrome. > I'm using IE 11, but think same with IE9 I'll paste the screenshot > later. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Updated] (FLINK-5901) DAG can not show properly in IE
[ https://issues.apache.org/jira/browse/FLINK-5901?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tao Wang updated FLINK-5901: Attachment: (was: IE11 with problem.png) > DAG can not show properly in IE > --- > > Key: FLINK-5901 > URL: https://issues.apache.org/jira/browse/FLINK-5901 > Project: Flink > Issue Type: Bug > Components: Webfrontend > Environment: IE 11 >Reporter: Tao Wang > Attachments: using chrom(same job).png, using IE.png > > > The DAG of running jobs can not show properly in IE11(I am using > 11.0.9600.18059, but assuming same with IE9). The description of task is > not shown within the rectangle. > Chrome is well. I pasted the screeshot under IE and Chrome below. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Updated] (FLINK-5902) Some images can not show in IE
[ https://issues.apache.org/jira/browse/FLINK-5902?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tao Wang updated FLINK-5902: Attachment: IE 11 with problem.png > Some images can not show in IE > -- > > Key: FLINK-5902 > URL: https://issues.apache.org/jira/browse/FLINK-5902 > Project: Flink > Issue Type: Bug > Components: Webfrontend > Environment: IE >Reporter: Tao Wang > Attachments: IE 11 with problem.png > > > Some images in the Overview page can not show in IE, as it is good in Chrome. > I'm using IE 11, but think same with IE9 I'll paste the screenshot > later. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (FLINK-5902) Some images can not show in IE
Tao Wang created FLINK-5902: --- Summary: Some images can not show in IE Key: FLINK-5902 URL: https://issues.apache.org/jira/browse/FLINK-5902 Project: Flink Issue Type: Bug Components: Webfrontend Environment: IE Reporter: Tao Wang Some images in the Overview page can not show in IE, as it is good in Chrome. I'm using IE 11, but think same with IE9 I'll paste the screenshot later. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Updated] (FLINK-5901) DAG can not show properly in IE
[ https://issues.apache.org/jira/browse/FLINK-5901?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tao Wang updated FLINK-5901: Attachment: IE11 with problem.png > DAG can not show properly in IE > --- > > Key: FLINK-5901 > URL: https://issues.apache.org/jira/browse/FLINK-5901 > Project: Flink > Issue Type: Bug > Components: Webfrontend > Environment: IE 11 >Reporter: Tao Wang > Attachments: IE11 with problem.png, using chrom(same job).png, using > IE.png > > > The DAG of running jobs can not show properly in IE11(I am using > 11.0.9600.18059, but assuming same with IE9). The description of task is > not shown within the rectangle. > Chrome is well. I pasted the screeshot under IE and Chrome below. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Updated] (FLINK-5901) DAG can not show properly in IE
[ https://issues.apache.org/jira/browse/FLINK-5901?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tao Wang updated FLINK-5901: Description: The DAG of running jobs can not show properly in IE11(I am using 11.0.9600.18059, but assuming same with IE9). The description of task is not shown within the rectangle. Chrome is well. I pasted the screeshot under IE and Chrome below. was: The DAG of running jobs can not show properly in IE11(I am using 11.0.9600.18059, but assuming same with IE9). The description of task is not shown within the rectangle. Chrome is well. > DAG can not show properly in IE > --- > > Key: FLINK-5901 > URL: https://issues.apache.org/jira/browse/FLINK-5901 > Project: Flink > Issue Type: Bug > Components: Webfrontend > Environment: IE 11 >Reporter: Tao Wang > Attachments: using chrom(same job).png, using IE.png > > > The DAG of running jobs can not show properly in IE11(I am using > 11.0.9600.18059, but assuming same with IE9). The description of task is > not shown within the rectangle. > Chrome is well. I pasted the screeshot under IE and Chrome below. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-3849) Add FilterableTableSource interface and translation rule
[ https://issues.apache.org/jira/browse/FLINK-3849?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15882058#comment-15882058 ] ASF GitHub Bot commented on FLINK-3849: --- Github user godfreyhe commented on a diff in the pull request: https://github.com/apache/flink/pull/3166#discussion_r102886003 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/PushFilterIntoStreamTableSourceScanRule.scala --- @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.rules.datastream + +import org.apache.calcite.plan.RelOptRule._ +import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall} +import org.apache.flink.table.plan.nodes.datastream.{DataStreamCalc, StreamTableSourceScan} +import org.apache.flink.table.plan.util.RexProgramExpressionExtractor._ +import org.apache.flink.table.plan.schema.TableSourceTable +import org.apache.flink.table.sources.FilterableTableSource + +class PushFilterIntoStreamTableSourceScanRule extends RelOptRule( + operand(classOf[DataStreamCalc], +operand(classOf[StreamTableSourceScan], none)), + "PushFilterIntoStreamTableSourceScanRule") { + + override def matches(call: RelOptRuleCall) = { +val calc: DataStreamCalc = call.rel(0).asInstanceOf[DataStreamCalc] +val scan: StreamTableSourceScan = call.rel(1).asInstanceOf[StreamTableSourceScan] +scan.tableSource match { + case _: FilterableTableSource => +calc.calcProgram.getCondition != null + case _ => false +} + } + + override def onMatch(call: RelOptRuleCall): Unit = { +val calc: DataStreamCalc = call.rel(0).asInstanceOf[DataStreamCalc] +val scan: StreamTableSourceScan = call.rel(1).asInstanceOf[StreamTableSourceScan] + +val filterableSource = scan.tableSource.asInstanceOf[FilterableTableSource] + +val program = calc.calcProgram +val tst = scan.getTable.unwrap(classOf[TableSourceTable[_]]) +val predicates = extractPredicateExpressions( + program, + call.builder().getRexBuilder, + tst.tableEnv.getFunctionCatalog) + +if (predicates.length != 0) { + val remainingPredicate = filterableSource.setPredicate(predicates) --- End diff -- if remainingPredicate is empty, we should remove calc node also. > Add FilterableTableSource interface and translation rule > > > Key: FLINK-3849 > URL: https://issues.apache.org/jira/browse/FLINK-3849 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: Anton Solovev > > Add a {{FilterableTableSource}} interface for {{TableSource}} implementations > which support filter push-down. > The interface could look as follows > {code} > def trait FilterableTableSource { > // returns unsupported predicate expression > def setPredicate(predicate: Expression): Expression > } > {code} > In addition we need Calcite rules to push a predicate (or parts of it) into a > TableScan that refers to a {{FilterableTableSource}}. We might need to tweak > the cost model as well to push the optimizer in the right direction. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #3166: [FLINK-3849] Add FilterableTableSource interface a...
Github user godfreyhe commented on a diff in the pull request: https://github.com/apache/flink/pull/3166#discussion_r102886003 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/PushFilterIntoStreamTableSourceScanRule.scala --- @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.rules.datastream + +import org.apache.calcite.plan.RelOptRule._ +import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall} +import org.apache.flink.table.plan.nodes.datastream.{DataStreamCalc, StreamTableSourceScan} +import org.apache.flink.table.plan.util.RexProgramExpressionExtractor._ +import org.apache.flink.table.plan.schema.TableSourceTable +import org.apache.flink.table.sources.FilterableTableSource + +class PushFilterIntoStreamTableSourceScanRule extends RelOptRule( + operand(classOf[DataStreamCalc], +operand(classOf[StreamTableSourceScan], none)), + "PushFilterIntoStreamTableSourceScanRule") { + + override def matches(call: RelOptRuleCall) = { +val calc: DataStreamCalc = call.rel(0).asInstanceOf[DataStreamCalc] +val scan: StreamTableSourceScan = call.rel(1).asInstanceOf[StreamTableSourceScan] +scan.tableSource match { + case _: FilterableTableSource => +calc.calcProgram.getCondition != null + case _ => false +} + } + + override def onMatch(call: RelOptRuleCall): Unit = { +val calc: DataStreamCalc = call.rel(0).asInstanceOf[DataStreamCalc] +val scan: StreamTableSourceScan = call.rel(1).asInstanceOf[StreamTableSourceScan] + +val filterableSource = scan.tableSource.asInstanceOf[FilterableTableSource] + +val program = calc.calcProgram +val tst = scan.getTable.unwrap(classOf[TableSourceTable[_]]) +val predicates = extractPredicateExpressions( + program, + call.builder().getRexBuilder, + tst.tableEnv.getFunctionCatalog) + +if (predicates.length != 0) { + val remainingPredicate = filterableSource.setPredicate(predicates) --- End diff -- if remainingPredicate is empty, we should remove calc node also. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Updated] (FLINK-5901) DAG can not show properly in IE
[ https://issues.apache.org/jira/browse/FLINK-5901?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tao Wang updated FLINK-5901: Attachment: using chrom(same job).png > DAG can not show properly in IE > --- > > Key: FLINK-5901 > URL: https://issues.apache.org/jira/browse/FLINK-5901 > Project: Flink > Issue Type: Bug > Components: Webfrontend > Environment: IE 11 >Reporter: Tao Wang > Attachments: using chrom(same job).png, using IE.png > > > The DAG of running jobs can not show properly in IE11(I am using > 11.0.9600.18059, but assuming same with IE9). The description of task is > not shown within the rectangle. > Chrome is well. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Updated] (FLINK-5901) DAG can not show properly in IE
[ https://issues.apache.org/jira/browse/FLINK-5901?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tao Wang updated FLINK-5901: Attachment: using IE.png > DAG can not show properly in IE > --- > > Key: FLINK-5901 > URL: https://issues.apache.org/jira/browse/FLINK-5901 > Project: Flink > Issue Type: Bug > Components: Webfrontend > Environment: IE 11 >Reporter: Tao Wang > Attachments: using IE.png > > > The DAG of running jobs can not show properly in IE11(I am using > 11.0.9600.18059, but assuming same with IE9). The description of task is > not shown within the rectangle. > Chrome is well. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (FLINK-5901) DAG can not show properly in IE
Tao Wang created FLINK-5901: --- Summary: DAG can not show properly in IE Key: FLINK-5901 URL: https://issues.apache.org/jira/browse/FLINK-5901 Project: Flink Issue Type: Bug Components: Webfrontend Environment: IE 11 Reporter: Tao Wang The DAG of running jobs can not show properly in IE11(I am using 11.0.9600.18059, but assuming same with IE9). The description of task is not shown within the rectangle. Chrome is well. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Comment Edited] (FLINK-5859) support partition pruning on Table API & SQL
[ https://issues.apache.org/jira/browse/FLINK-5859?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15881999#comment-15881999 ] godfrey he edited comment on FLINK-5859 at 2/24/17 5:59 AM: Hi, [~fhueske], Thanks for your advice. yes, partition pruning is a kind of coarse-grained filter push-down, both filter-pushdown and partition-pruning have common part that is extracting predicate from filter-condition base on the interest of different datasources. But, filter-pushdown and partition-pruning are independent concept in general. The following table shows that different datasources have different traits: ||Trait||Example|| |filter-pushdown only|MySQL, HBase| |partiton-pruning only|CSV, TEXT| |both filter-pushdown and partition-pruning| Parquet, Druid| IMO, we should provide a clear concept as [~ykt836] mentioned above for developers, that includes both FilterableTableSource and PartitionableTableSource. Looking forward to your advice, thanks. was (Author: godfreyhe): Hi, [~fhueske], Thanks for your advice. yes, partition pruning is a kind of coarse-grained filter push-down, both filter-pushdown and partition-pruning have common parts that are extracting predicate from filter-condition base on the interest of different datasources. But, IMO, filter-pushdown and partition-pruning are independent concept in general. The following table shows that different datasources have different traits: ||Trait||Example|| |filter-pushdown only|MySQL, HBase| |partiton-pruning only|CSV, TEXT| |both filter-pushdown and partition-pruning| Parquet, Druid| IMO, we should provide a clear concept as [~ykt836] mentioned above for developers, that includes both FilterableTableSource and PartitionableTableSource. Looking forward to your advice, thanks. > support partition pruning on Table API & SQL > > > Key: FLINK-5859 > URL: https://issues.apache.org/jira/browse/FLINK-5859 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: godfrey he >Assignee: godfrey he > > Many data sources are partitionable storage, e.g. HDFS, Druid. And many > queries just need to read a small subset of the total data. We can use > partition information to prune or skip over files irrelevant to the user’s > queries. Both query optimization time and execution time can be reduced > obviously, especially for a large partitioned table. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5891) ConnectedComponents is broken when object reuse enabled
[ https://issues.apache.org/jira/browse/FLINK-5891?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15882017#comment-15882017 ] Xingcan Cui commented on FLINK-5891: Thanks for your explanation, [~greghogan]. I'm afraid my PR on https://issues.apache.org/jira/browse/FLINK-1526 gets the same problem as I also store values with non-primitive types (anyhow the primitive types will not be affected, right?) from the received messages. I saw the following code in Flink's ML lib. To avoid the reference problem, it makes a deep copy of each {{StreamRecord element}}. {code:title=AbstractCEPBasePatternOperator.java | borderStyle=solid} ... // we have to buffer the elements until we receive the proper watermark if (getExecutionConfig().isObjectReuseEnabled()) { // copy the StreamRecord so that it cannot be changed priorityQueue.offer(new StreamRecord(inputSerializer.copy(element.getValue()), element.getTimestamp())); } else { priorityQueue.offer(element); } updatePriorityQueue(priorityQueue); ... {code} So, what's your suggestions on fixing this? I'd like to work on it (and surely also the PR of Flink-1526). > ConnectedComponents is broken when object reuse enabled > --- > > Key: FLINK-5891 > URL: https://issues.apache.org/jira/browse/FLINK-5891 > Project: Flink > Issue Type: Bug > Components: Gelly >Affects Versions: 1.3.0 >Reporter: Greg Hogan > > {{org.apache.flink.graph.library.ConnectedComponents.CCUpdater#updateVertex}} > is storing a value from its iterator. > {{GSAConnectedComponents}} does not have this limitation. > {code} > public static final class CCUpdater> extends GatherFunction { > @Override > public void updateVertex(Vertex vertex, > MessageIterator messages) throws Exception { > VV current = vertex.getValue(); > VV min = current; > for (VV msg : messages) { > if (msg.compareTo(min) < 0) { > min = msg; > } > } > if (!min.equals(current)) { > setNewVertexValue(min); > } > } > } > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5692) Add an Option to Deactivate Kryo Fallback for Serializers
[ https://issues.apache.org/jira/browse/FLINK-5692?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15882007#comment-15882007 ] ASF GitHub Bot commented on FLINK-5692: --- Github user jinmingjian commented on the issue: https://github.com/apache/flink/pull/3373 @StephanEwen Just my coding habit. Correction done. And very appreciated for your review. I am open for more contribution! :tada: > Add an Option to Deactivate Kryo Fallback for Serializers > - > > Key: FLINK-5692 > URL: https://issues.apache.org/jira/browse/FLINK-5692 > Project: Flink > Issue Type: New Feature > Components: Type Serialization System >Affects Versions: 1.2.0 >Reporter: Stephan Ewen >Assignee: Jin Mingjian > Labels: easyfix, starter > > Some users want to avoid that Flink's serializers use Kryo, as it can easily > become a hotspot in serialization. > For those users, it would help if there is a flag to "deactive generic > types". Those users could then see where types are used that default to Kryo > and change these types (make them PoJos, Value types, or write custom > serializers). > There are two ways to approach that: > 1. (Simple) Make {{GenericTypeInfo}} threw an exception whenever it would > create a Kryo Serializer (when the respective flag is set in the > {{ExecutionConfig}}) > 2. Have a static flag on the {{TypeExtractor}} to throw an exception > whenever it would create a {{GenericTypeInfo}}. This approach has the > downside of introducing some static configuration to the TypeExtractor, but > may be more helpful because it throws exceptions in the programs at points > where the types are used (not where the serializers are created, which may be > much later). -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink issue #3373: [FLINK-5692] [config] Add an Option to Deactivate Kryo Fa...
Github user jinmingjian commented on the issue: https://github.com/apache/flink/pull/3373 @StephanEwen Just my coding habit. Correction done. And very appreciated for your review. I am open for more contribution! :tada: --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-5859) support partition pruning on Table API & SQL
[ https://issues.apache.org/jira/browse/FLINK-5859?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15881999#comment-15881999 ] godfrey he commented on FLINK-5859: --- Hi, [~fhueske], Thanks for your advice. yes, partition pruning is a kind of coarse-grained filter push-down, both filter-pushdown and partition-pruning have common parts that are extracting predicate from filter-condition base on the interest of different datasources. But, IMO, filter-pushdown and partition-pruning are independent concept in general. The following table shows that different datasources have different traits: ||Trait||Example|| |filter-pushdown only|MySQL, HBase| |partiton-pruning only|CSV, TEXT| |both filter-pushdown and partition-pruning| Parquet, Druid| IMO, we should provide a clear concept as [~ykt836] mentioned above for developers, that includes both FilterableTableSource and PartitionableTableSource. Looking forward to your advice, thanks. > support partition pruning on Table API & SQL > > > Key: FLINK-5859 > URL: https://issues.apache.org/jira/browse/FLINK-5859 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: godfrey he >Assignee: godfrey he > > Many data sources are partitionable storage, e.g. HDFS, Druid. And many > queries just need to read a small subset of the total data. We can use > partition information to prune or skip over files irrelevant to the user’s > queries. Both query optimization time and execution time can be reduced > obviously, especially for a large partitioned table. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-3679) DeserializationSchema should handle zero or more outputs for every input
[ https://issues.apache.org/jira/browse/FLINK-3679?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15881992#comment-15881992 ] ASF GitHub Bot commented on FLINK-3679: --- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/3314#discussion_r102881632 --- Diff: flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SimpleConsumerThread.java --- @@ -373,16 +370,28 @@ else if (partitionsRemoved) { keyPayload.get(keyBytes); } - final T value = deserializer.deserialize(keyBytes, valueBytes, - currentPartition.getTopic(), currentPartition.getPartition(), offset); - - if (deserializer.isEndOfStream(value)) { - // remove partition from subscribed partitions. - partitionsIterator.remove(); - continue partitionsLoop; - } - - owner.emitRecord(value, currentPartition, offset); + final Collector collector = new Collector() { --- End diff -- @haohui, if you don't mind, I would also wait for @rmetzger to take another look at the new proposals here, before you jump back again into the code. This part is quite critical for Flink Kafka's exacty-once guarantee, so another pair of eyes on this will be safer. I would also like to do a thorough pass on your code and see if there are other problems, so you work on those all-together. Is that ok for you? Sorry for some more waiting. > DeserializationSchema should handle zero or more outputs for every input > > > Key: FLINK-3679 > URL: https://issues.apache.org/jira/browse/FLINK-3679 > Project: Flink > Issue Type: Bug > Components: DataStream API, Kafka Connector >Reporter: Jamie Grier >Assignee: Haohui Mai > > There are a couple of issues with the DeserializationSchema API that I think > should be improved. This request has come to me via an existing Flink user. > The main issue is simply that the API assumes that there is a one-to-one > mapping between input and outputs. In reality there are scenarios where one > input message (say from Kafka) might actually map to zero or more logical > elements in the pipeline. > Particularly important here is the case where you receive a message from a > source (such as Kafka) and say the raw bytes don't deserialize properly. > Right now the only recourse is to throw IOException and therefore fail the > job. > This is definitely not good since bad data is a reality and failing the job > is not the right option. If the job fails we'll just end up replaying the > bad data and the whole thing will start again. > Instead in this case it would be best if the user could just return the empty > set. > The other case is where one input message should logically be multiple output > messages. This case is probably less important since there are other ways to > do this but in general it might be good to make the > DeserializationSchema.deserialize() method return a collection rather than a > single element. > Maybe we need to support a DeserializationSchema variant that has semantics > more like that of FlatMap. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #3314: [FLINK-3679] DeserializationSchema should handle z...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/3314#discussion_r102881632 --- Diff: flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SimpleConsumerThread.java --- @@ -373,16 +370,28 @@ else if (partitionsRemoved) { keyPayload.get(keyBytes); } - final T value = deserializer.deserialize(keyBytes, valueBytes, - currentPartition.getTopic(), currentPartition.getPartition(), offset); - - if (deserializer.isEndOfStream(value)) { - // remove partition from subscribed partitions. - partitionsIterator.remove(); - continue partitionsLoop; - } - - owner.emitRecord(value, currentPartition, offset); + final Collector collector = new Collector() { --- End diff -- @haohui, if you don't mind, I would also wait for @rmetzger to take another look at the new proposals here, before you jump back again into the code. This part is quite critical for Flink Kafka's exacty-once guarantee, so another pair of eyes on this will be safer. I would also like to do a thorough pass on your code and see if there are other problems, so you work on those all-together. Is that ok for you? Sorry for some more waiting. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3679) DeserializationSchema should handle zero or more outputs for every input
[ https://issues.apache.org/jira/browse/FLINK-3679?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15881989#comment-15881989 ] ASF GitHub Bot commented on FLINK-3679: --- Github user haohui commented on a diff in the pull request: https://github.com/apache/flink/pull/3314#discussion_r102881264 --- Diff: flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SimpleConsumerThread.java --- @@ -373,16 +370,28 @@ else if (partitionsRemoved) { keyPayload.get(keyBytes); } - final T value = deserializer.deserialize(keyBytes, valueBytes, - currentPartition.getTopic(), currentPartition.getPartition(), offset); - - if (deserializer.isEndOfStream(value)) { - // remove partition from subscribed partitions. - partitionsIterator.remove(); - continue partitionsLoop; - } - - owner.emitRecord(value, currentPartition, offset); + final Collector collector = new Collector() { --- End diff -- I see what you are saying. The trade off here is handing offs the objects another time, but I think it's okay. I'll update the PR accordingly. > DeserializationSchema should handle zero or more outputs for every input > > > Key: FLINK-3679 > URL: https://issues.apache.org/jira/browse/FLINK-3679 > Project: Flink > Issue Type: Bug > Components: DataStream API, Kafka Connector >Reporter: Jamie Grier >Assignee: Haohui Mai > > There are a couple of issues with the DeserializationSchema API that I think > should be improved. This request has come to me via an existing Flink user. > The main issue is simply that the API assumes that there is a one-to-one > mapping between input and outputs. In reality there are scenarios where one > input message (say from Kafka) might actually map to zero or more logical > elements in the pipeline. > Particularly important here is the case where you receive a message from a > source (such as Kafka) and say the raw bytes don't deserialize properly. > Right now the only recourse is to throw IOException and therefore fail the > job. > This is definitely not good since bad data is a reality and failing the job > is not the right option. If the job fails we'll just end up replaying the > bad data and the whole thing will start again. > Instead in this case it would be best if the user could just return the empty > set. > The other case is where one input message should logically be multiple output > messages. This case is probably less important since there are other ways to > do this but in general it might be good to make the > DeserializationSchema.deserialize() method return a collection rather than a > single element. > Maybe we need to support a DeserializationSchema variant that has semantics > more like that of FlatMap. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #3314: [FLINK-3679] DeserializationSchema should handle z...
Github user haohui commented on a diff in the pull request: https://github.com/apache/flink/pull/3314#discussion_r102881264 --- Diff: flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SimpleConsumerThread.java --- @@ -373,16 +370,28 @@ else if (partitionsRemoved) { keyPayload.get(keyBytes); } - final T value = deserializer.deserialize(keyBytes, valueBytes, - currentPartition.getTopic(), currentPartition.getPartition(), offset); - - if (deserializer.isEndOfStream(value)) { - // remove partition from subscribed partitions. - partitionsIterator.remove(); - continue partitionsLoop; - } - - owner.emitRecord(value, currentPartition, offset); + final Collector collector = new Collector() { --- End diff -- I see what you are saying. The trade off here is handing offs the objects another time, but I think it's okay. I'll update the PR accordingly. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3679) DeserializationSchema should handle zero or more outputs for every input
[ https://issues.apache.org/jira/browse/FLINK-3679?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15881986#comment-15881986 ] ASF GitHub Bot commented on FLINK-3679: --- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/3314#discussion_r102881092 --- Diff: flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SimpleConsumerThread.java --- @@ -373,16 +370,28 @@ else if (partitionsRemoved) { keyPayload.get(keyBytes); } - final T value = deserializer.deserialize(keyBytes, valueBytes, - currentPartition.getTopic(), currentPartition.getPartition(), offset); - - if (deserializer.isEndOfStream(value)) { - // remove partition from subscribed partitions. - partitionsIterator.remove(); - continue partitionsLoop; - } - - owner.emitRecord(value, currentPartition, offset); + final Collector collector = new Collector() { --- End diff -- @haohui hmm this seems a bit odd to me. I think it should be achievable. ``` // the buffer; this can be shared final List bufferedElements = new LinkedList<>(); // BufferCollector is an implementation of Collector that adds collected elements to bufferedElements; this can be shared final BufferCollector collector = new BufferCollector(bufferedElements); ... for (final ConsumerRecordrecord : partitionRecords) { deserializer.deserialize( record.key(), record.value(), record.topic(), record.partition(), record.offset(), collector); emitRecords(bufferedElements, partitionState, record.offset(), record); bufferedElements.clear(); // after the elements for the record have been emitted, empty out the buffer } ``` Doesn't this work? I haven't really tried this hands-on, so I might be overlooking something. Let me know what you think :) > DeserializationSchema should handle zero or more outputs for every input > > > Key: FLINK-3679 > URL: https://issues.apache.org/jira/browse/FLINK-3679 > Project: Flink > Issue Type: Bug > Components: DataStream API, Kafka Connector >Reporter: Jamie Grier >Assignee: Haohui Mai > > There are a couple of issues with the DeserializationSchema API that I think > should be improved. This request has come to me via an existing Flink user. > The main issue is simply that the API assumes that there is a one-to-one > mapping between input and outputs. In reality there are scenarios where one > input message (say from Kafka) might actually map to zero or more logical > elements in the pipeline. > Particularly important here is the case where you receive a message from a > source (such as Kafka) and say the raw bytes don't deserialize properly. > Right now the only recourse is to throw IOException and therefore fail the > job. > This is definitely not good since bad data is a reality and failing the job > is not the right option. If the job fails we'll just end up replaying the > bad data and the whole thing will start again. > Instead in this case it would be best if the user could just return the empty > set. > The other case is where one input message should logically be multiple output > messages. This case is probably less important since there are other ways to > do this but in general it might be good to make the > DeserializationSchema.deserialize() method return a collection rather than a > single element. > Maybe we need to support a DeserializationSchema variant that has semantics > more like that of FlatMap. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #3314: [FLINK-3679] DeserializationSchema should handle z...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/3314#discussion_r102881092 --- Diff: flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SimpleConsumerThread.java --- @@ -373,16 +370,28 @@ else if (partitionsRemoved) { keyPayload.get(keyBytes); } - final T value = deserializer.deserialize(keyBytes, valueBytes, - currentPartition.getTopic(), currentPartition.getPartition(), offset); - - if (deserializer.isEndOfStream(value)) { - // remove partition from subscribed partitions. - partitionsIterator.remove(); - continue partitionsLoop; - } - - owner.emitRecord(value, currentPartition, offset); + final Collector collector = new Collector() { --- End diff -- @haohui hmm this seems a bit odd to me. I think it should be achievable. ``` // the buffer; this can be shared final List bufferedElements = new LinkedList<>(); // BufferCollector is an implementation of Collector that adds collected elements to bufferedElements; this can be shared final BufferCollector collector = new BufferCollector(bufferedElements); ... for (final ConsumerRecordrecord : partitionRecords) { deserializer.deserialize( record.key(), record.value(), record.topic(), record.partition(), record.offset(), collector); emitRecords(bufferedElements, partitionState, record.offset(), record); bufferedElements.clear(); // after the elements for the record have been emitted, empty out the buffer } ``` Doesn't this work? I haven't really tried this hands-on, so I might be overlooking something. Let me know what you think :) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Comment Edited] (FLINK-5898) Race-Condition with Amazon Kinesis KPL
[ https://issues.apache.org/jira/browse/FLINK-5898?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15881949#comment-15881949 ] Tzu-Li (Gordon) Tai edited comment on FLINK-5898 at 2/24/17 5:12 AM: - Thanks for looking into the issue [~skidder]. This seems tricky. It isn't possible to share the {{KinesisProducer}} across the subtasks (which doesn't make sense), and there's no means to coordinate multiple subtasks to synchronize this access either. I'm not sure how we should deal with this one ... It does however bring up the question again of whether or not we should use the low-level Java SDK instead of KPL for implementation of {{FlinkKinesisProducer}}. [~rmetzger] what do you think? If there is a possible way to solve this without replacing KPL and is within our reach, then I'm against considering the replacement. Right now I just don't see a possible solution other than KPL changing the binary file to be different across processes, but that's not something we can really push. was (Author: tzulitai): Thanks for looking into the issue [~skidder]. This seems tricky. It isn't possible to share the {{KinesisProducer}} across the subtasks, and there's no means to coordinate multiple subtasks to synchronize this access either. I'm not sure how we should deal with this one ... It does however bring up the question again of whether or not we should use the low-level Java SDK instead of KPL for implementation of {{FlinkKinesisProducer}}. [~rmetzger] what do you think? If there is a possible way to solve this without replacing KPL and is within our reach, then I'm against considering the replacement. Right now I just don't see a possible solution other than KPL changing the binary file to be different across processes, but that's not something we can really push. > Race-Condition with Amazon Kinesis KPL > -- > > Key: FLINK-5898 > URL: https://issues.apache.org/jira/browse/FLINK-5898 > Project: Flink > Issue Type: Bug > Components: Kinesis Connector >Affects Versions: 1.2.0 >Reporter: Scott Kidder > > The Flink Kinesis streaming-connector uses the Amazon Kinesis Producer > Library (KPL) to send messages to Kinesis streams. The KPL relies on a native > binary client to send messages to achieve better performance. > When a Kinesis Producer is instantiated, the KPL will extract the native > binary to a sub-directory of `/tmp` (or whatever the platform-specific > temporary directory happens to be). > The KPL tries to prevent multiple processes from extracting the binary at the > same time by wrapping the operation in a mutex. Unfortunately, this does not > prevent multiple Flink cores from trying to perform this operation at the > same time. If two or more processes attempt to do this at the same time, then > the native binary in /tmp will be corrupted. > The authors of the KPL are aware of this possibility and suggest that users > of the KPL not do that ... (sigh): > https://github.com/awslabs/amazon-kinesis-producer/issues/55#issuecomment-251408897 > I encountered this in my production environment when bringing up a new Flink > task-manager with multiple cores and restoring from an earlier savepoint, > resulting in the instantiation of a KPL client on each core at roughly the > same time. > A stack-trace follows: > {noformat} > java.lang.RuntimeException: Could not copy native binaries to temp directory > /tmp/amazon-kinesis-producer-native-binaries > at > com.amazonaws.services.kinesis.producer.KinesisProducer.extractBinaries(KinesisProducer.java:849) > at > com.amazonaws.services.kinesis.producer.KinesisProducer.(KinesisProducer.java:243) > at > org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer.open(FlinkKinesisProducer.java:198) > at > org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36) > at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:112) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:376) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:252) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.lang.SecurityException: The contents of the binary > /tmp/amazon-kinesis-producer-native-binaries/kinesis_producer_e9a87c761db92a73eb74519a4468ee71def87eb2 > is not what it's expected to be. > at > com.amazonaws.services.kinesis.producer.KinesisProducer.extractBinaries(KinesisProducer.java:822) > ... 8 more > {noformat} -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5898) Race-Condition with Amazon Kinesis KPL
[ https://issues.apache.org/jira/browse/FLINK-5898?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15881976#comment-15881976 ] Tzu-Li (Gordon) Tai commented on FLINK-5898: That's great! Thanks a lot for the efforts and please keep us posted :-) > Race-Condition with Amazon Kinesis KPL > -- > > Key: FLINK-5898 > URL: https://issues.apache.org/jira/browse/FLINK-5898 > Project: Flink > Issue Type: Bug > Components: Kinesis Connector >Affects Versions: 1.2.0 >Reporter: Scott Kidder > > The Flink Kinesis streaming-connector uses the Amazon Kinesis Producer > Library (KPL) to send messages to Kinesis streams. The KPL relies on a native > binary client to send messages to achieve better performance. > When a Kinesis Producer is instantiated, the KPL will extract the native > binary to a sub-directory of `/tmp` (or whatever the platform-specific > temporary directory happens to be). > The KPL tries to prevent multiple processes from extracting the binary at the > same time by wrapping the operation in a mutex. Unfortunately, this does not > prevent multiple Flink cores from trying to perform this operation at the > same time. If two or more processes attempt to do this at the same time, then > the native binary in /tmp will be corrupted. > The authors of the KPL are aware of this possibility and suggest that users > of the KPL not do that ... (sigh): > https://github.com/awslabs/amazon-kinesis-producer/issues/55#issuecomment-251408897 > I encountered this in my production environment when bringing up a new Flink > task-manager with multiple cores and restoring from an earlier savepoint, > resulting in the instantiation of a KPL client on each core at roughly the > same time. > A stack-trace follows: > {noformat} > java.lang.RuntimeException: Could not copy native binaries to temp directory > /tmp/amazon-kinesis-producer-native-binaries > at > com.amazonaws.services.kinesis.producer.KinesisProducer.extractBinaries(KinesisProducer.java:849) > at > com.amazonaws.services.kinesis.producer.KinesisProducer.(KinesisProducer.java:243) > at > org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer.open(FlinkKinesisProducer.java:198) > at > org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36) > at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:112) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:376) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:252) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.lang.SecurityException: The contents of the binary > /tmp/amazon-kinesis-producer-native-binaries/kinesis_producer_e9a87c761db92a73eb74519a4468ee71def87eb2 > is not what it's expected to be. > at > com.amazonaws.services.kinesis.producer.KinesisProducer.extractBinaries(KinesisProducer.java:822) > ... 8 more > {noformat} -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5898) Race-Condition with Amazon Kinesis KPL
[ https://issues.apache.org/jira/browse/FLINK-5898?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15881967#comment-15881967 ] Scott Kidder commented on FLINK-5898: - Hi [~tzulitai], I'll look into fixing this in the KPL. I noticed that the method that installs the KPL binary uses a shared lock, which would allow multiple processes to obtain overlapping locks and write to the same file simultaneously: https://github.com/awslabs/amazon-kinesis-producer/blob/master/java/amazon-kinesis-producer/src/main/java/com/amazonaws/services/kinesis/producer/KinesisProducer.java#L815 I'll try patching the KPL to obtain an exclusive lock. I'll also file a Github issue against the KPL to see what the KPL authors think. > Race-Condition with Amazon Kinesis KPL > -- > > Key: FLINK-5898 > URL: https://issues.apache.org/jira/browse/FLINK-5898 > Project: Flink > Issue Type: Bug > Components: Kinesis Connector >Affects Versions: 1.2.0 >Reporter: Scott Kidder > > The Flink Kinesis streaming-connector uses the Amazon Kinesis Producer > Library (KPL) to send messages to Kinesis streams. The KPL relies on a native > binary client to send messages to achieve better performance. > When a Kinesis Producer is instantiated, the KPL will extract the native > binary to a sub-directory of `/tmp` (or whatever the platform-specific > temporary directory happens to be). > The KPL tries to prevent multiple processes from extracting the binary at the > same time by wrapping the operation in a mutex. Unfortunately, this does not > prevent multiple Flink cores from trying to perform this operation at the > same time. If two or more processes attempt to do this at the same time, then > the native binary in /tmp will be corrupted. > The authors of the KPL are aware of this possibility and suggest that users > of the KPL not do that ... (sigh): > https://github.com/awslabs/amazon-kinesis-producer/issues/55#issuecomment-251408897 > I encountered this in my production environment when bringing up a new Flink > task-manager with multiple cores and restoring from an earlier savepoint, > resulting in the instantiation of a KPL client on each core at roughly the > same time. > A stack-trace follows: > {noformat} > java.lang.RuntimeException: Could not copy native binaries to temp directory > /tmp/amazon-kinesis-producer-native-binaries > at > com.amazonaws.services.kinesis.producer.KinesisProducer.extractBinaries(KinesisProducer.java:849) > at > com.amazonaws.services.kinesis.producer.KinesisProducer.(KinesisProducer.java:243) > at > org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer.open(FlinkKinesisProducer.java:198) > at > org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36) > at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:112) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:376) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:252) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.lang.SecurityException: The contents of the binary > /tmp/amazon-kinesis-producer-native-binaries/kinesis_producer_e9a87c761db92a73eb74519a4468ee71def87eb2 > is not what it's expected to be. > at > com.amazonaws.services.kinesis.producer.KinesisProducer.extractBinaries(KinesisProducer.java:822) > ... 8 more > {noformat} -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Comment Edited] (FLINK-5898) Race-Condition with Amazon Kinesis KPL
[ https://issues.apache.org/jira/browse/FLINK-5898?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15881949#comment-15881949 ] Tzu-Li (Gordon) Tai edited comment on FLINK-5898 at 2/24/17 4:53 AM: - Thanks for looking into the issue [~skidder]. This seems tricky. It isn't possible to share the {{KinesisProducer}} across the subtasks, and there's no means to coordinate multiple subtasks to synchronize this access either. I'm not sure how we should deal with this one ... It does however bring up the question again of whether or not we should use the low-level Java SDK instead of KPL for implementation of {{FlinkKinesisProducer}}. [~rmetzger] what do you think? If there is a possible way to solve this without replacing KPL and is within our reach, then I'm against considering the replacement. Right now I just don't see a possible solution other than KPL changing the binary file to be different across processes, but that's not something we can really push. was (Author: tzulitai): Thanks for looking into the issue [~skidder]. This seems tricky. It isn't possible to share the {{KinesisProducer}} across the subtasks, and there's no means to coordinate multiple subtasks to synchronize this access either. I'm not sure how we should deal with this one ... It does however bring up the question again of whether or not we should use the low-level Java SDK instead of KPL for implementation of {{FlinkKinesisProducer}}. [~rmetzger] what do you think? > Race-Condition with Amazon Kinesis KPL > -- > > Key: FLINK-5898 > URL: https://issues.apache.org/jira/browse/FLINK-5898 > Project: Flink > Issue Type: Bug > Components: Kinesis Connector >Affects Versions: 1.2.0 >Reporter: Scott Kidder > > The Flink Kinesis streaming-connector uses the Amazon Kinesis Producer > Library (KPL) to send messages to Kinesis streams. The KPL relies on a native > binary client to send messages to achieve better performance. > When a Kinesis Producer is instantiated, the KPL will extract the native > binary to a sub-directory of `/tmp` (or whatever the platform-specific > temporary directory happens to be). > The KPL tries to prevent multiple processes from extracting the binary at the > same time by wrapping the operation in a mutex. Unfortunately, this does not > prevent multiple Flink cores from trying to perform this operation at the > same time. If two or more processes attempt to do this at the same time, then > the native binary in /tmp will be corrupted. > The authors of the KPL are aware of this possibility and suggest that users > of the KPL not do that ... (sigh): > https://github.com/awslabs/amazon-kinesis-producer/issues/55#issuecomment-251408897 > I encountered this in my production environment when bringing up a new Flink > task-manager with multiple cores and restoring from an earlier savepoint, > resulting in the instantiation of a KPL client on each core at roughly the > same time. > A stack-trace follows: > {noformat} > java.lang.RuntimeException: Could not copy native binaries to temp directory > /tmp/amazon-kinesis-producer-native-binaries > at > com.amazonaws.services.kinesis.producer.KinesisProducer.extractBinaries(KinesisProducer.java:849) > at > com.amazonaws.services.kinesis.producer.KinesisProducer.(KinesisProducer.java:243) > at > org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer.open(FlinkKinesisProducer.java:198) > at > org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36) > at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:112) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:376) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:252) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.lang.SecurityException: The contents of the binary > /tmp/amazon-kinesis-producer-native-binaries/kinesis_producer_e9a87c761db92a73eb74519a4468ee71def87eb2 > is not what it's expected to be. > at > com.amazonaws.services.kinesis.producer.KinesisProducer.extractBinaries(KinesisProducer.java:822) > ... 8 more > {noformat} -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5898) Race-Condition with Amazon Kinesis KPL
[ https://issues.apache.org/jira/browse/FLINK-5898?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15881949#comment-15881949 ] Tzu-Li (Gordon) Tai commented on FLINK-5898: Thanks for looking into the issue [~skidder]. This seems tricky. It isn't possible to share the {{KinesisProducer}} across the subtasks, and there's no means to coordinate multiple subtasks to synchronize this access either. I'm not sure how we should deal with this one ... It does however bring up the question again of whether or not we should use the low-level Java SDK instead of KPL for implementation of {{FlinkKinesisProducer}}. [~rmetzger] what do you think? > Race-Condition with Amazon Kinesis KPL > -- > > Key: FLINK-5898 > URL: https://issues.apache.org/jira/browse/FLINK-5898 > Project: Flink > Issue Type: Bug > Components: Kinesis Connector >Affects Versions: 1.2.0 >Reporter: Scott Kidder > > The Flink Kinesis streaming-connector uses the Amazon Kinesis Producer > Library (KPL) to send messages to Kinesis streams. The KPL relies on a native > binary client to send messages to achieve better performance. > When a Kinesis Producer is instantiated, the KPL will extract the native > binary to a sub-directory of `/tmp` (or whatever the platform-specific > temporary directory happens to be). > The KPL tries to prevent multiple processes from extracting the binary at the > same time by wrapping the operation in a mutex. Unfortunately, this does not > prevent multiple Flink cores from trying to perform this operation at the > same time. If two or more processes attempt to do this at the same time, then > the native binary in /tmp will be corrupted. > The authors of the KPL are aware of this possibility and suggest that users > of the KPL not do that ... (sigh): > https://github.com/awslabs/amazon-kinesis-producer/issues/55#issuecomment-251408897 > I encountered this in my production environment when bringing up a new Flink > task-manager with multiple cores and restoring from an earlier savepoint, > resulting in the instantiation of a KPL client on each core at roughly the > same time. > A stack-trace follows: > {noformat} > java.lang.RuntimeException: Could not copy native binaries to temp directory > /tmp/amazon-kinesis-producer-native-binaries > at > com.amazonaws.services.kinesis.producer.KinesisProducer.extractBinaries(KinesisProducer.java:849) > at > com.amazonaws.services.kinesis.producer.KinesisProducer.(KinesisProducer.java:243) > at > org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer.open(FlinkKinesisProducer.java:198) > at > org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36) > at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:112) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:376) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:252) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.lang.SecurityException: The contents of the binary > /tmp/amazon-kinesis-producer-native-binaries/kinesis_producer_e9a87c761db92a73eb74519a4468ee71def87eb2 > is not what it's expected to be. > at > com.amazonaws.services.kinesis.producer.KinesisProducer.extractBinaries(KinesisProducer.java:822) > ... 8 more > {noformat} -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5899) Fix the bug in EventTimeTumblingWindow for non-partialMerge aggregate
[ https://issues.apache.org/jira/browse/FLINK-5899?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15881900#comment-15881900 ] ASF GitHub Bot commented on FLINK-5899: --- GitHub user shaoxuan-wang opened a pull request: https://github.com/apache/flink/pull/3405 [FLINK-5899] [table] Fix the bug in EventTimeTumblingWindow for non-partialMerge aggregate I have changed the supportPartial to false for all built-in Aggregates, and run all the UTs. Luckily this is the only bug we have so far. Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration. If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide](http://flink.apache.org/how-to-contribute.html). In addition to going through the list, please provide a meaningful description of your changes. - [X] General - The pull request references the related JIRA issue ("[FLINK-XXX] Jira title text") - The pull request addresses only one issue - Each commit in the PR has a meaningful commit message (including the JIRA id) - [ ] Documentation - Documentation has been added for new functionality - Old documentation affected by the pull request has been updated - JavaDoc for public methods has been added - [X] Tests & Build - Functionality added by the pull request is covered by tests - `mvn clean verify` has been executed successfully locally or a Travis build has passed You can merge this pull request into a Git repository by running: $ git pull https://github.com/shaoxuan-wang/flink F5899-submit Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3405.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3405 commit bb233179d06325b481fe0e2a903a55c547529f06 Author: shaoxuan-wangDate: 2017-02-24T03:57:44Z [FLINK-5899] [table] Fix the bug in EventTimeTumblingWindow for non-partialMerge aggregate > Fix the bug in EventTimeTumblingWindow for non-partialMerge aggregate > - > > Key: FLINK-5899 > URL: https://issues.apache.org/jira/browse/FLINK-5899 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Reporter: Shaoxuan Wang >Assignee: Shaoxuan Wang > > The row length used to initialize > DataSetTumbleTimeWindowAggReduceGroupFunction was not set properly. (I think > this is introduced by mistake when merging the code). > We currently lack the built-in non-partial-merge Aggregates. Therefore this > has not been captured by the unit test. > Reproduce step: > 1. set the "supportPartial" to false for SumAggregate > 2. Then both testAllEventTimeTumblingWindowOverTime and > testEventTimeTumblingGroupWindowOverTime will fail. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #3405: [FLINK-5899] [table] Fix the bug in EventTimeTumbl...
GitHub user shaoxuan-wang opened a pull request: https://github.com/apache/flink/pull/3405 [FLINK-5899] [table] Fix the bug in EventTimeTumblingWindow for non-partialMerge aggregate I have changed the supportPartial to false for all built-in Aggregates, and run all the UTs. Luckily this is the only bug we have so far. Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration. If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide](http://flink.apache.org/how-to-contribute.html). In addition to going through the list, please provide a meaningful description of your changes. - [X] General - The pull request references the related JIRA issue ("[FLINK-XXX] Jira title text") - The pull request addresses only one issue - Each commit in the PR has a meaningful commit message (including the JIRA id) - [ ] Documentation - Documentation has been added for new functionality - Old documentation affected by the pull request has been updated - JavaDoc for public methods has been added - [X] Tests & Build - Functionality added by the pull request is covered by tests - `mvn clean verify` has been executed successfully locally or a Travis build has passed You can merge this pull request into a Git repository by running: $ git pull https://github.com/shaoxuan-wang/flink F5899-submit Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3405.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3405 commit bb233179d06325b481fe0e2a903a55c547529f06 Author: shaoxuan-wangDate: 2017-02-24T03:57:44Z [FLINK-5899] [table] Fix the bug in EventTimeTumblingWindow for non-partialMerge aggregate --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3190: [FLINK-5546][build] java.io.tmpdir setted as project buil...
Github user shijinkui commented on the issue: https://github.com/apache/flink/pull/3190 > Can we just use the ${project.build.directory} as java.io.tmpdir ? @wenlong88 Sorry for late reply. It's good question. If use `${project.build.directory}` without sub directory `tmp`, the UT will create various directories, maybe the directories overlap with other dir, such as `classes`ï¼`surefire-reports` and so on. Using a special dir `tmp` can avoid the probability of directory conflict. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-5546) java.io.tmpdir setted as project build directory in surefire plugin
[ https://issues.apache.org/jira/browse/FLINK-5546?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15881895#comment-15881895 ] ASF GitHub Bot commented on FLINK-5546: --- Github user shijinkui commented on the issue: https://github.com/apache/flink/pull/3190 > Can we just use the ${project.build.directory} as java.io.tmpdir ? @wenlong88 Sorry for late reply. It's good question. If use `${project.build.directory}` without sub directory `tmp`, the UT will create various directories, maybe the directories overlap with other dir, such as `classes`/`surefire-reports` and so on. Using a special dir `tmp` can avoid the probability of directory conflict. > java.io.tmpdir setted as project build directory in surefire plugin > --- > > Key: FLINK-5546 > URL: https://issues.apache.org/jira/browse/FLINK-5546 > Project: Flink > Issue Type: Sub-task > Components: Build System > Environment: CentOS 7.2 >Reporter: Syinchwun Leo >Assignee: shijinkui > Fix For: 1.2.1 > > > When multiple Linux users run test at the same time, flink-runtime module may > fail. User A creates /tmp/cacheFile, and User B will have no permission to > visit the fold. > Failed tests: > FileCacheDeleteValidationTest.setup:79 Error initializing the test: > /tmp/cacheFile (Permission denied) > Tests in error: > IOManagerTest.channelEnumerator:54 » Runtime Could not create storage > director... > Tests run: 1385, Failures: 1, Errors: 1, Skipped: 8 -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Updated] (FLINK-5899) Fix the bug in EventTimeTumblingWindow for non-partialMerge aggregate
[ https://issues.apache.org/jira/browse/FLINK-5899?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shaoxuan Wang updated FLINK-5899: - Summary: Fix the bug in EventTimeTumblingWindow for non-partialMerge aggregate (was: Fix the bug in initializing the DataSetTumbleTimeWindowAggReduceGroupFunction) > Fix the bug in EventTimeTumblingWindow for non-partialMerge aggregate > - > > Key: FLINK-5899 > URL: https://issues.apache.org/jira/browse/FLINK-5899 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Reporter: Shaoxuan Wang >Assignee: Shaoxuan Wang > > The row length used to initialize > DataSetTumbleTimeWindowAggReduceGroupFunction was not set properly. (I think > this is introduced by mistake when merging the code). > We currently lack the built-in non-partial-merge Aggregates. Therefore this > has not been captured by the unit test. > Reproduce step: > 1. set the "supportPartial" to false for SumAggregate > 2. Then both testAllEventTimeTumblingWindowOverTime and > testEventTimeTumblingGroupWindowOverTime will fail. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (FLINK-5900) Add non-partial merge Aggregates and unit tests
Shaoxuan Wang created FLINK-5900: Summary: Add non-partial merge Aggregates and unit tests Key: FLINK-5900 URL: https://issues.apache.org/jira/browse/FLINK-5900 Project: Flink Issue Type: Improvement Reporter: Shaoxuan Wang Assignee: Shaoxuan Wang Current built-in aggregates all support partial-merge. We are blind and not sure if the non-partial aggregate works or not. We should add non-partial merge Aggregates and unit tests. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (FLINK-5899) Fix the bug in initializing the DataSetTumbleTimeWindowAggReduceGroupFunction
Shaoxuan Wang created FLINK-5899: Summary: Fix the bug in initializing the DataSetTumbleTimeWindowAggReduceGroupFunction Key: FLINK-5899 URL: https://issues.apache.org/jira/browse/FLINK-5899 Project: Flink Issue Type: Bug Components: Table API & SQL Reporter: Shaoxuan Wang Assignee: Shaoxuan Wang The row length used to initialize DataSetTumbleTimeWindowAggReduceGroupFunction was not set properly. (I think this is introduced by mistake when merging the code). We currently lack the built-in non-partial-merge Aggregates. Therefore this has not been captured by the unit test. Reproduce step: 1. set the "supportPartial" to false for SumAggregate 2. Then both testAllEventTimeTumblingWindowOverTime and testEventTimeTumblingGroupWindowOverTime will fail. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5586) Extend TableProgramsTestBase for object reuse modes
[ https://issues.apache.org/jira/browse/FLINK-5586?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15881829#comment-15881829 ] ASF GitHub Bot commented on FLINK-5586: --- Github user KurtYoung commented on the issue: https://github.com/apache/flink/pull/3339 rebased to the latest master. > Extend TableProgramsTestBase for object reuse modes > --- > > Key: FLINK-5586 > URL: https://issues.apache.org/jira/browse/FLINK-5586 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Kurt Young > > We should also test if all runtime operators of the Table API work correctly > if object reuse mode is set to true. This should be done for all > cluster-based ITCases, not the collection-based ones. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink issue #3339: [FLINK-5586] [table] Extend TableProgramsClusterTestBase ...
Github user KurtYoung commented on the issue: https://github.com/apache/flink/pull/3339 rebased to the latest master. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Closed] (FLINK-5790) Use list types when ListStateDescriptor extends StateDescriptor
[ https://issues.apache.org/jira/browse/FLINK-5790?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Xiaogang Shi closed FLINK-5790. --- Resolution: Fixed Fixed via d47446cafffe0d34d89488f6eb860aa139ceb3f1 > Use list types when ListStateDescriptor extends StateDescriptor > --- > > Key: FLINK-5790 > URL: https://issues.apache.org/jira/browse/FLINK-5790 > Project: Flink > Issue Type: Improvement >Reporter: Xiaogang Shi >Assignee: Xiaogang Shi > > Flink keeps the state serializer in {{StateDescriptor}}, but it's the > serializer of list elements that is put in {{ListStateDescriptor}}. The > implementation is a little confusing. Some backends need to construct the > state serializer with the element serializer by themselves. > We should use an {{ArrayListSerializer}}, which is composed of the serializer > of the element, in the {{ListStateDescriptor}}. It helps the backend to avoid > constructing the state serializer. > If a backend needs customized serialization of the state (e.g. > {{RocksDBStateBackend}}), it still can obtain the element serializer from the > {{ArrayListSerializer}}. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5881) ScalarFunction(UDF) should support variable types and variable arguments
[ https://issues.apache.org/jira/browse/FLINK-5881?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15881789#comment-15881789 ] ASF GitHub Bot commented on FLINK-5881: --- Github user clarkyzl commented on a diff in the pull request: https://github.com/apache/flink/pull/3389#discussion_r102865002 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala --- @@ -78,20 +78,7 @@ object UserDefinedFunctionUtils { function: UserDefinedFunction, signature: Seq[TypeInformation[_]]) : Option[Array[Class[_]]] = { -// We compare the raw Java classes not the TypeInformation. -// TypeInformation does not matter during runtime (e.g. within a MapFunction). -val actualSignature = typeInfoToClass(signature) -val signatures = getSignatures(function) - -signatures - // go over all signatures and find one matching actual signature - .find { curSig => - // match parameters of signature to actual parameters - actualSignature.length == curSig.length && -curSig.zipWithIndex.forall { case (clazz, i) => - parameterTypeEquals(actualSignature(i), clazz) -} -} --- End diff -- I deleted them, because both methods are simply copy and paste. One was used for ScalarFunction, the other was used for TableFunction. > ScalarFunction(UDF) should support variable types and variable arguments > - > > Key: FLINK-5881 > URL: https://issues.apache.org/jira/browse/FLINK-5881 > Project: Flink > Issue Type: Sub-task >Reporter: Zhuoluo Yang >Assignee: Zhuoluo Yang > > As a sub-task of FLINK-5826. We would like to support the ScalarFunction > first and make the review a little bit easier. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5881) ScalarFunction(UDF) should support variable types and variable arguments
[ https://issues.apache.org/jira/browse/FLINK-5881?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15881790#comment-15881790 ] ASF GitHub Bot commented on FLINK-5881: --- Github user clarkyzl commented on a diff in the pull request: https://github.com/apache/flink/pull/3389#discussion_r102864763 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala --- @@ -140,6 +138,24 @@ object UserDefinedFunctionUtils { s"one method named 'eval' which is public, not abstract and " + s"(in case of table functions) not static.") } else { + var trailingSeq = false + var noVargs = true + methods.foreach(method => { +val signatures = method.getParameterTypes +if (signatures.nonEmpty) { + if (method.isVarArgs) { +noVargs = false + } else if (signatures.last.getName.equals("scala.collection.Seq")) { +trailingSeq = true + } +} + }) + if (trailingSeq && noVargs) { +// We found trailing "scala.collection.Seq", but no trailing "Type[]", "Type..." +throw new ValidationException("The 'eval' method do not support Scala type of " + --- End diff -- This is correct. Because if there is multiple methods found (override), it will throw another exception. > ScalarFunction(UDF) should support variable types and variable arguments > - > > Key: FLINK-5881 > URL: https://issues.apache.org/jira/browse/FLINK-5881 > Project: Flink > Issue Type: Sub-task >Reporter: Zhuoluo Yang >Assignee: Zhuoluo Yang > > As a sub-task of FLINK-5826. We would like to support the ScalarFunction > first and make the review a little bit easier. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #3389: [FLINK-5881] [table] ScalarFunction(UDF) should su...
Github user clarkyzl commented on a diff in the pull request: https://github.com/apache/flink/pull/3389#discussion_r102864763 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala --- @@ -140,6 +138,24 @@ object UserDefinedFunctionUtils { s"one method named 'eval' which is public, not abstract and " + s"(in case of table functions) not static.") } else { + var trailingSeq = false + var noVargs = true + methods.foreach(method => { +val signatures = method.getParameterTypes +if (signatures.nonEmpty) { + if (method.isVarArgs) { +noVargs = false + } else if (signatures.last.getName.equals("scala.collection.Seq")) { +trailingSeq = true + } +} + }) + if (trailingSeq && noVargs) { +// We found trailing "scala.collection.Seq", but no trailing "Type[]", "Type..." +throw new ValidationException("The 'eval' method do not support Scala type of " + --- End diff -- This is correct. Because if there is multiple methods found (override), it will throw another exception. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3389: [FLINK-5881] [table] ScalarFunction(UDF) should su...
Github user clarkyzl commented on a diff in the pull request: https://github.com/apache/flink/pull/3389#discussion_r102865002 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala --- @@ -78,20 +78,7 @@ object UserDefinedFunctionUtils { function: UserDefinedFunction, signature: Seq[TypeInformation[_]]) : Option[Array[Class[_]]] = { -// We compare the raw Java classes not the TypeInformation. -// TypeInformation does not matter during runtime (e.g. within a MapFunction). -val actualSignature = typeInfoToClass(signature) -val signatures = getSignatures(function) - -signatures - // go over all signatures and find one matching actual signature - .find { curSig => - // match parameters of signature to actual parameters - actualSignature.length == curSig.length && -curSig.zipWithIndex.forall { case (clazz, i) => - parameterTypeEquals(actualSignature(i), clazz) -} -} --- End diff -- I deleted them, because both methods are simply copy and paste. One was used for ScalarFunction, the other was used for TableFunction. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Closed] (FLINK-5863) Unify the serialization of queryable list states in different backends
[ https://issues.apache.org/jira/browse/FLINK-5863?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Xiaogang Shi closed FLINK-5863. --- Resolution: Fixed Now that we are refactoring the queryable states, we can make the changes then. > Unify the serialization of queryable list states in different backends > -- > > Key: FLINK-5863 > URL: https://issues.apache.org/jira/browse/FLINK-5863 > Project: Flink > Issue Type: Improvement > Components: Queryable State >Affects Versions: 1.3.0 >Reporter: Xiaogang Shi >Assignee: Xiaogang Shi >Priority: Minor > > Now the deserialization of list states is implemented in > {{KvStateRequestSerializer}}. The serialization however is implemented > individually in different backends. > We should provide a method in {{KvStateRequestSerializer}} to remove the > redundant code. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-4856) Add MapState for keyed streams
[ https://issues.apache.org/jira/browse/FLINK-4856?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15881755#comment-15881755 ] ASF GitHub Bot commented on FLINK-4856: --- Github user shixiaogang closed the pull request at: https://github.com/apache/flink/pull/3336 > Add MapState for keyed streams > -- > > Key: FLINK-4856 > URL: https://issues.apache.org/jira/browse/FLINK-4856 > Project: Flink > Issue Type: New Feature > Components: DataStream API, State Backends, Checkpointing >Reporter: Xiaogang Shi >Assignee: Xiaogang Shi > Fix For: 1.3.0 > > > Many states in keyed streams are organized as key-value pairs. Currently, > these states are implemented by storing the entire map into a ValueState or a > ListState. The implementation however is very costly because all entries have > to be serialized/deserialized when updating a single entry. To improve the > efficiency of these states, MapStates are urgently needed. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #3336: [FLINK-4856][state] Add MapState in KeyedState
Github user shixiaogang closed the pull request at: https://github.com/apache/flink/pull/3336 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-5859) support partition pruning on Table API & SQL
[ https://issues.apache.org/jira/browse/FLINK-5859?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15881737#comment-15881737 ] Kurt Young commented on FLINK-5859: --- Hi [~fhueske], How about this approach: We both provide {{FilterableTableSource}} and {{PartitionableTableSource}}, keep {{FilterableTableSource}} as it is, and add methods like {{getAllPartitions}} and {{applyPartitionPruning}} to {{PartitionableTableSource}}. From a developer's point of view, we can treat these two traits completely independent. It will be easier for a developer to implement each functionality independently in comparing with mixing all the logic into the {{FilterableTableSource. setPredicate()}}. Also in the future, i think it will be very likely that these two traits will be applied by framework in different optimization stage. We apply the partition pruning as early as possible in the logical optimization and let filter pushdown been applied a little bit later because it should do some heavy weighted physical level analysis first. BTW, this approach still can achieve the approach you suggested, you can implement {{FilterableTableSource}} only and do all the pruning and filtering if you like. > support partition pruning on Table API & SQL > > > Key: FLINK-5859 > URL: https://issues.apache.org/jira/browse/FLINK-5859 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: godfrey he >Assignee: godfrey he > > Many data sources are partitionable storage, e.g. HDFS, Druid. And many > queries just need to read a small subset of the total data. We can use > partition information to prune or skip over files irrelevant to the user’s > queries. Both query optimization time and execution time can be reduced > obviously, especially for a large partitioned table. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (FLINK-5898) Race-Condition with Amazon Kinesis KPL
Scott Kidder created FLINK-5898: --- Summary: Race-Condition with Amazon Kinesis KPL Key: FLINK-5898 URL: https://issues.apache.org/jira/browse/FLINK-5898 Project: Flink Issue Type: Bug Components: Kinesis Connector Affects Versions: 1.2.0 Reporter: Scott Kidder The Flink Kinesis streaming-connector uses the Amazon Kinesis Producer Library (KPL) to send messages to Kinesis streams. The KPL relies on a native binary client to send messages to achieve better performance. When a Kinesis Producer is instantiated, the KPL will extract the native binary to a sub-directory of `/tmp` (or whatever the platform-specific temporary directory happens to be). The KPL tries to prevent multiple processes from extracting the binary at the same time by wrapping the operation in a mutex. Unfortunately, this does not prevent multiple Flink cores from trying to perform this operation at the same time. If two or more processes attempt to do this at the same time, then the native binary in /tmp will be corrupted. The authors of the KPL are aware of this possibility and suggest that users of the KPL not do that ... (sigh): https://github.com/awslabs/amazon-kinesis-producer/issues/55#issuecomment-251408897 I encountered this in my production environment when bringing up a new Flink task-manager with multiple cores and restoring from an earlier savepoint, resulting in the instantiation of a KPL client on each core at roughly the same time. A stack-trace follows: {noformat} java.lang.RuntimeException: Could not copy native binaries to temp directory /tmp/amazon-kinesis-producer-native-binaries at com.amazonaws.services.kinesis.producer.KinesisProducer.extractBinaries(KinesisProducer.java:849) at com.amazonaws.services.kinesis.producer.KinesisProducer.(KinesisProducer.java:243) at org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer.open(FlinkKinesisProducer.java:198) at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:112) at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:376) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:252) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.SecurityException: The contents of the binary /tmp/amazon-kinesis-producer-native-binaries/kinesis_producer_e9a87c761db92a73eb74519a4468ee71def87eb2 is not what it's expected to be. at com.amazonaws.services.kinesis.producer.KinesisProducer.extractBinaries(KinesisProducer.java:822) ... 8 more {noformat} -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #3370: [FLINK-5710] Add ProcTime() function to indicate S...
GitHub user haohui reopened a pull request: https://github.com/apache/flink/pull/3370 [FLINK-5710] Add ProcTime() function to indicate StreamSQL. This is the commit we used internally -- There is no unit tests associated with this PR. It simply serves as a reference point for #3302. You can merge this pull request into a Git repository by running: $ git pull https://github.com/haohui/flink FLINK-5710 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3370.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3370 commit e68a7ad22cad926dac2f211fa3bd56ef481c4036 Author: Haohui MaiDate: 2017-02-23T21:51:45Z [FLINK-5710] Add ProcTime() function to indicate StreamSQL. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-5710) Add ProcTime() function to indicate StreamSQL
[ https://issues.apache.org/jira/browse/FLINK-5710?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15881507#comment-15881507 ] ASF GitHub Bot commented on FLINK-5710: --- GitHub user haohui reopened a pull request: https://github.com/apache/flink/pull/3370 [FLINK-5710] Add ProcTime() function to indicate StreamSQL. This is the commit we used internally -- There is no unit tests associated with this PR. It simply serves as a reference point for #3302. You can merge this pull request into a Git repository by running: $ git pull https://github.com/haohui/flink FLINK-5710 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3370.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3370 commit e68a7ad22cad926dac2f211fa3bd56ef481c4036 Author: Haohui MaiDate: 2017-02-23T21:51:45Z [FLINK-5710] Add ProcTime() function to indicate StreamSQL. > Add ProcTime() function to indicate StreamSQL > - > > Key: FLINK-5710 > URL: https://issues.apache.org/jira/browse/FLINK-5710 > Project: Flink > Issue Type: New Feature > Components: DataStream API >Reporter: Stefano Bortoli >Assignee: Stefano Bortoli >Priority: Minor > > procTime() is a parameterless scalar function that just indicates processing > time mode -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #3354: [FLINK-5767] [Table] New aggregate function interf...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3354#discussion_r102826611 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MaxAggFunction.scala --- @@ -109,8 +119,11 @@ class DecimalMaxAggFunction extends MaxAggFunction[BigDecimal] { if (value != null) { val v = value.asInstanceOf[BigDecimal] val accum = accumulator.asInstanceOf[MaxAccumulator[BigDecimal]] - if (accum.max == null || accum.max.compareTo(v) < 0) { -accum.max = v + if (!accum.f1 || accum.f0.compareTo(v) < 0) { +accum.f0 = v +if (!accum.f1) { --- End diff -- remove condition. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-5767) New aggregate function interface and built-in aggregate functions
[ https://issues.apache.org/jira/browse/FLINK-5767?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15881427#comment-15881427 ] ASF GitHub Bot commented on FLINK-5767: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3354#discussion_r102826611 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MaxAggFunction.scala --- @@ -109,8 +119,11 @@ class DecimalMaxAggFunction extends MaxAggFunction[BigDecimal] { if (value != null) { val v = value.asInstanceOf[BigDecimal] val accum = accumulator.asInstanceOf[MaxAccumulator[BigDecimal]] - if (accum.max == null || accum.max.compareTo(v) < 0) { -accum.max = v + if (!accum.f1 || accum.f0.compareTo(v) < 0) { +accum.f0 = v +if (!accum.f1) { --- End diff -- remove condition. > New aggregate function interface and built-in aggregate functions > - > > Key: FLINK-5767 > URL: https://issues.apache.org/jira/browse/FLINK-5767 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Shaoxuan Wang >Assignee: Shaoxuan Wang > > Add a new aggregate function interface. This includes implementing the > aggregate interface, migrating the existing aggregation functions to this > interface, and adding the unit tests for these functions. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5767) New aggregate function interface and built-in aggregate functions
[ https://issues.apache.org/jira/browse/FLINK-5767?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15881424#comment-15881424 ] ASF GitHub Bot commented on FLINK-5767: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3354#discussion_r102835657 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MinAggFunction.scala --- @@ -31,7 +31,8 @@ abstract class MinAggFunction[T](implicit ord: Ordering[T]) extends AggregateFun /** The initial accumulator for Min aggregate function */ class MinAccumulator[T] extends JTuple2[T, Boolean] with Accumulator { --- End diff -- We can remove the type `T` > New aggregate function interface and built-in aggregate functions > - > > Key: FLINK-5767 > URL: https://issues.apache.org/jira/browse/FLINK-5767 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Shaoxuan Wang >Assignee: Shaoxuan Wang > > Add a new aggregate function interface. This includes implementing the > aggregate interface, migrating the existing aggregation functions to this > interface, and adding the unit tests for these functions. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #3354: [FLINK-5767] [Table] New aggregate function interf...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3354#discussion_r102835585 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/SumAggFunction.scala --- @@ -32,7 +31,8 @@ abstract class SumAggFunction[T: Numeric] extends AggregateFunction[T] { /** The initial accumulator for Sum aggregate function */ class SumAccumulator[T] extends JTuple2[T, Boolean] with Accumulator { --- End diff -- We can remove the type `T`. It will then be passed down from the parent class. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3354: [FLINK-5767] [Table] New aggregate function interf...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3354#discussion_r102828705 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/SumAggFunction.scala --- @@ -127,30 +132,31 @@ class DecimalSumAggFunction extends AggregateFunction[BigDecimal] { if (value != null) { val v = value.asInstanceOf[BigDecimal] val accum = accumulator.asInstanceOf[DecimalSumAccumulator] - if (accum.sum == null) { -accum.sum = v + if (accum.f1 == false) { --- End diff -- remove condition. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-5767) New aggregate function interface and built-in aggregate functions
[ https://issues.apache.org/jira/browse/FLINK-5767?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15881428#comment-15881428 ] ASF GitHub Bot commented on FLINK-5767: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3354#discussion_r102826652 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MinAggFunction.scala --- @@ -41,24 +42,33 @@ abstract class MinAggFunction[T](implicit ord: Ordering[T]) extends AggregateFun override def accumulate(accumulator: Accumulator, value: Any) = { if (value != null) { val v = value.asInstanceOf[T] - val accum = accumulator.asInstanceOf[MinAccumulator[T]] - if (accum.max == null || ord.compare(accum.max, v) > 0) { -accum.max = v + val a = accumulator.asInstanceOf[MinAccumulator[T]] + if (!a.f1 || ord.compare(a.f0, v) > 0) { +a.f0 = v +if (!a.f1) { --- End diff -- Remove condition > New aggregate function interface and built-in aggregate functions > - > > Key: FLINK-5767 > URL: https://issues.apache.org/jira/browse/FLINK-5767 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Shaoxuan Wang >Assignee: Shaoxuan Wang > > Add a new aggregate function interface. This includes implementing the > aggregate interface, migrating the existing aggregation functions to this > interface, and adding the unit tests for these functions. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #3354: [FLINK-5767] [Table] New aggregate function interf...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3354#discussion_r102828543 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/SumAggFunction.scala --- @@ -44,21 +44,22 @@ abstract class SumAggFunction[T: Numeric] extends AggregateFunction[T] { override def accumulate(accumulator: Accumulator, value: Any) = { if (value != null) { val v = value.asInstanceOf[T] - val accum = accumulator.asInstanceOf[SumAccumulator[T]] - if (accum.sum == null.asInstanceOf[T]) { -accum.sum = v + val a = accumulator.asInstanceOf[SumAccumulator[T]] + if (!a.f1) { --- End diff -- since we start with `sum = 0` we don't need this condition. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-5767) New aggregate function interface and built-in aggregate functions
[ https://issues.apache.org/jira/browse/FLINK-5767?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15881431#comment-15881431 ] ASF GitHub Bot commented on FLINK-5767: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3354#discussion_r102828705 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/SumAggFunction.scala --- @@ -127,30 +132,31 @@ class DecimalSumAggFunction extends AggregateFunction[BigDecimal] { if (value != null) { val v = value.asInstanceOf[BigDecimal] val accum = accumulator.asInstanceOf[DecimalSumAccumulator] - if (accum.sum == null) { -accum.sum = v + if (accum.f1 == false) { --- End diff -- remove condition. > New aggregate function interface and built-in aggregate functions > - > > Key: FLINK-5767 > URL: https://issues.apache.org/jira/browse/FLINK-5767 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Shaoxuan Wang >Assignee: Shaoxuan Wang > > Add a new aggregate function interface. This includes implementing the > aggregate interface, migrating the existing aggregation functions to this > interface, and adding the unit tests for these functions. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5767) New aggregate function interface and built-in aggregate functions
[ https://issues.apache.org/jira/browse/FLINK-5767?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15881432#comment-15881432 ] ASF GitHub Bot commented on FLINK-5767: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3354#discussion_r102826382 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MaxAggFunction.scala --- @@ -41,24 +42,33 @@ abstract class MaxAggFunction[T](implicit ord: Ordering[T]) extends AggregateFun override def accumulate(accumulator: Accumulator, value: Any) = { if (value != null) { val v = value.asInstanceOf[T] - val accum = accumulator.asInstanceOf[MaxAccumulator[T]] - if (accum.max == null || ord.compare(accum.max, v) < 0) { -accum.max = v + val a = accumulator.asInstanceOf[MaxAccumulator[T]] + if (!a.f1 || ord.compare(a.f0, v) < 0) { +a.f0 = v +if (!a.f1) { --- End diff -- the condition can be removed. We can simply reassign `true`. > New aggregate function interface and built-in aggregate functions > - > > Key: FLINK-5767 > URL: https://issues.apache.org/jira/browse/FLINK-5767 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Shaoxuan Wang >Assignee: Shaoxuan Wang > > Add a new aggregate function interface. This includes implementing the > aggregate interface, migrating the existing aggregation functions to this > interface, and adding the unit tests for these functions. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5767) New aggregate function interface and built-in aggregate functions
[ https://issues.apache.org/jira/browse/FLINK-5767?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15881425#comment-15881425 ] ASF GitHub Bot commented on FLINK-5767: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3354#discussion_r102828658 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/SumAggFunction.scala --- @@ -67,10 +68,13 @@ abstract class SumAggFunction[T: Numeric] extends AggregateFunction[T] { var i: Int = 0 while (i < accumulators.size()) { val a = accumulators.get(i).asInstanceOf[SumAccumulator[T]] - if (ret.sum == null.asInstanceOf[T]) { -ret.sum = a.sum - } else if (a.sum != null.asInstanceOf[T]) { -ret.sum = numeric.plus(ret.sum, a.sum) + if (a.f1) { +if (!ret.f1) { --- End diff -- Since we start with `sum = 0` we don't need this condition. > New aggregate function interface and built-in aggregate functions > - > > Key: FLINK-5767 > URL: https://issues.apache.org/jira/browse/FLINK-5767 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Shaoxuan Wang >Assignee: Shaoxuan Wang > > Add a new aggregate function interface. This includes implementing the > aggregate interface, migrating the existing aggregation functions to this > interface, and adding the unit tests for these functions. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5767) New aggregate function interface and built-in aggregate functions
[ https://issues.apache.org/jira/browse/FLINK-5767?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15881426#comment-15881426 ] ASF GitHub Bot commented on FLINK-5767: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3354#discussion_r102835712 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MaxAggFunction.scala --- @@ -31,7 +31,8 @@ abstract class MaxAggFunction[T](implicit ord: Ordering[T]) extends AggregateFun /** The initial accumulator for Max aggregate function */ class MaxAccumulator[T] extends JTuple2[T, Boolean] with Accumulator { --- End diff -- We can remove the type `T` > New aggregate function interface and built-in aggregate functions > - > > Key: FLINK-5767 > URL: https://issues.apache.org/jira/browse/FLINK-5767 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Shaoxuan Wang >Assignee: Shaoxuan Wang > > Add a new aggregate function interface. This includes implementing the > aggregate interface, migrating the existing aggregation functions to this > interface, and adding the unit tests for these functions. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5767) New aggregate function interface and built-in aggregate functions
[ https://issues.apache.org/jira/browse/FLINK-5767?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15881430#comment-15881430 ] ASF GitHub Bot commented on FLINK-5767: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3354#discussion_r102835585 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/SumAggFunction.scala --- @@ -32,7 +31,8 @@ abstract class SumAggFunction[T: Numeric] extends AggregateFunction[T] { /** The initial accumulator for Sum aggregate function */ class SumAccumulator[T] extends JTuple2[T, Boolean] with Accumulator { --- End diff -- We can remove the type `T`. It will then be passed down from the parent class. > New aggregate function interface and built-in aggregate functions > - > > Key: FLINK-5767 > URL: https://issues.apache.org/jira/browse/FLINK-5767 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Shaoxuan Wang >Assignee: Shaoxuan Wang > > Add a new aggregate function interface. This includes implementing the > aggregate interface, migrating the existing aggregation functions to this > interface, and adding the unit tests for these functions. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5767) New aggregate function interface and built-in aggregate functions
[ https://issues.apache.org/jira/browse/FLINK-5767?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15881423#comment-15881423 ] ASF GitHub Bot commented on FLINK-5767: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3354#discussion_r102835493 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/SumAggFunction.scala --- @@ -32,7 +31,8 @@ abstract class SumAggFunction[T: Numeric] extends AggregateFunction[T] { /** The initial accumulator for Sum aggregate function */ class SumAccumulator[T] extends JTuple2[T, Boolean] with Accumulator { -var sum: T = null.asInstanceOf[T] +f0 = 0.asInstanceOf[T] //sum --- End diff -- change this to `f0 = numeric.zero` > New aggregate function interface and built-in aggregate functions > - > > Key: FLINK-5767 > URL: https://issues.apache.org/jira/browse/FLINK-5767 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Shaoxuan Wang >Assignee: Shaoxuan Wang > > Add a new aggregate function interface. This includes implementing the > aggregate interface, migrating the existing aggregation functions to this > interface, and adding the unit tests for these functions. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5767) New aggregate function interface and built-in aggregate functions
[ https://issues.apache.org/jira/browse/FLINK-5767?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15881429#comment-15881429 ] ASF GitHub Bot commented on FLINK-5767: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3354#discussion_r102828543 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/SumAggFunction.scala --- @@ -44,21 +44,22 @@ abstract class SumAggFunction[T: Numeric] extends AggregateFunction[T] { override def accumulate(accumulator: Accumulator, value: Any) = { if (value != null) { val v = value.asInstanceOf[T] - val accum = accumulator.asInstanceOf[SumAccumulator[T]] - if (accum.sum == null.asInstanceOf[T]) { -accum.sum = v + val a = accumulator.asInstanceOf[SumAccumulator[T]] + if (!a.f1) { --- End diff -- since we start with `sum = 0` we don't need this condition. > New aggregate function interface and built-in aggregate functions > - > > Key: FLINK-5767 > URL: https://issues.apache.org/jira/browse/FLINK-5767 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Shaoxuan Wang >Assignee: Shaoxuan Wang > > Add a new aggregate function interface. This includes implementing the > aggregate interface, migrating the existing aggregation functions to this > interface, and adding the unit tests for these functions. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #3354: [FLINK-5767] [Table] New aggregate function interf...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3354#discussion_r102835657 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MinAggFunction.scala --- @@ -31,7 +31,8 @@ abstract class MinAggFunction[T](implicit ord: Ordering[T]) extends AggregateFun /** The initial accumulator for Min aggregate function */ class MinAccumulator[T] extends JTuple2[T, Boolean] with Accumulator { --- End diff -- We can remove the type `T` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3354: [FLINK-5767] [Table] New aggregate function interf...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3354#discussion_r102835712 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MaxAggFunction.scala --- @@ -31,7 +31,8 @@ abstract class MaxAggFunction[T](implicit ord: Ordering[T]) extends AggregateFun /** The initial accumulator for Max aggregate function */ class MaxAccumulator[T] extends JTuple2[T, Boolean] with Accumulator { --- End diff -- We can remove the type `T` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3354: [FLINK-5767] [Table] New aggregate function interf...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3354#discussion_r102828658 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/SumAggFunction.scala --- @@ -67,10 +68,13 @@ abstract class SumAggFunction[T: Numeric] extends AggregateFunction[T] { var i: Int = 0 while (i < accumulators.size()) { val a = accumulators.get(i).asInstanceOf[SumAccumulator[T]] - if (ret.sum == null.asInstanceOf[T]) { -ret.sum = a.sum - } else if (a.sum != null.asInstanceOf[T]) { -ret.sum = numeric.plus(ret.sum, a.sum) + if (a.f1) { +if (!ret.f1) { --- End diff -- Since we start with `sum = 0` we don't need this condition. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3354: [FLINK-5767] [Table] New aggregate function interf...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3354#discussion_r102835493 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/SumAggFunction.scala --- @@ -32,7 +31,8 @@ abstract class SumAggFunction[T: Numeric] extends AggregateFunction[T] { /** The initial accumulator for Sum aggregate function */ class SumAccumulator[T] extends JTuple2[T, Boolean] with Accumulator { -var sum: T = null.asInstanceOf[T] +f0 = 0.asInstanceOf[T] //sum --- End diff -- change this to `f0 = numeric.zero` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3354: [FLINK-5767] [Table] New aggregate function interf...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3354#discussion_r102826382 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MaxAggFunction.scala --- @@ -41,24 +42,33 @@ abstract class MaxAggFunction[T](implicit ord: Ordering[T]) extends AggregateFun override def accumulate(accumulator: Accumulator, value: Any) = { if (value != null) { val v = value.asInstanceOf[T] - val accum = accumulator.asInstanceOf[MaxAccumulator[T]] - if (accum.max == null || ord.compare(accum.max, v) < 0) { -accum.max = v + val a = accumulator.asInstanceOf[MaxAccumulator[T]] + if (!a.f1 || ord.compare(a.f0, v) < 0) { +a.f0 = v +if (!a.f1) { --- End diff -- the condition can be removed. We can simply reassign `true`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3354: [FLINK-5767] [Table] New aggregate function interf...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3354#discussion_r102826652 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MinAggFunction.scala --- @@ -41,24 +42,33 @@ abstract class MinAggFunction[T](implicit ord: Ordering[T]) extends AggregateFun override def accumulate(accumulator: Accumulator, value: Any) = { if (value != null) { val v = value.asInstanceOf[T] - val accum = accumulator.asInstanceOf[MinAccumulator[T]] - if (accum.max == null || ord.compare(accum.max, v) > 0) { -accum.max = v + val a = accumulator.asInstanceOf[MinAccumulator[T]] + if (!a.f1 || ord.compare(a.f0, v) > 0) { +a.f0 = v +if (!a.f1) { --- End diff -- Remove condition --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3679) DeserializationSchema should handle zero or more outputs for every input
[ https://issues.apache.org/jira/browse/FLINK-3679?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15881376#comment-15881376 ] ASF GitHub Bot commented on FLINK-3679: --- Github user haohui commented on a diff in the pull request: https://github.com/apache/flink/pull/3314#discussion_r102830609 --- Diff: flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SimpleConsumerThread.java --- @@ -373,16 +370,28 @@ else if (partitionsRemoved) { keyPayload.get(keyBytes); } - final T value = deserializer.deserialize(keyBytes, valueBytes, - currentPartition.getTopic(), currentPartition.getPartition(), offset); - - if (deserializer.isEndOfStream(value)) { - // remove partition from subscribed partitions. - partitionsIterator.remove(); - continue partitionsLoop; - } - - owner.emitRecord(value, currentPartition, offset); + final Collector collector = new Collector() { --- End diff -- Good catch, @tzulitai ! I tried the buffer approach and had no luck. The problem is that calling `emitRecord`needs to pass in both the offset and the record itself -- The record is used to extract the timestamp in the Kafka 0.10 consumers. The buffer itself needs to buffer the deserialized value and the record itself -- it cannot solve the problem of having a collector per record. > DeserializationSchema should handle zero or more outputs for every input > > > Key: FLINK-3679 > URL: https://issues.apache.org/jira/browse/FLINK-3679 > Project: Flink > Issue Type: Bug > Components: DataStream API, Kafka Connector >Reporter: Jamie Grier >Assignee: Haohui Mai > > There are a couple of issues with the DeserializationSchema API that I think > should be improved. This request has come to me via an existing Flink user. > The main issue is simply that the API assumes that there is a one-to-one > mapping between input and outputs. In reality there are scenarios where one > input message (say from Kafka) might actually map to zero or more logical > elements in the pipeline. > Particularly important here is the case where you receive a message from a > source (such as Kafka) and say the raw bytes don't deserialize properly. > Right now the only recourse is to throw IOException and therefore fail the > job. > This is definitely not good since bad data is a reality and failing the job > is not the right option. If the job fails we'll just end up replaying the > bad data and the whole thing will start again. > Instead in this case it would be best if the user could just return the empty > set. > The other case is where one input message should logically be multiple output > messages. This case is probably less important since there are other ways to > do this but in general it might be good to make the > DeserializationSchema.deserialize() method return a collection rather than a > single element. > Maybe we need to support a DeserializationSchema variant that has semantics > more like that of FlatMap. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #3314: [FLINK-3679] DeserializationSchema should handle z...
Github user haohui commented on a diff in the pull request: https://github.com/apache/flink/pull/3314#discussion_r102830609 --- Diff: flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SimpleConsumerThread.java --- @@ -373,16 +370,28 @@ else if (partitionsRemoved) { keyPayload.get(keyBytes); } - final T value = deserializer.deserialize(keyBytes, valueBytes, - currentPartition.getTopic(), currentPartition.getPartition(), offset); - - if (deserializer.isEndOfStream(value)) { - // remove partition from subscribed partitions. - partitionsIterator.remove(); - continue partitionsLoop; - } - - owner.emitRecord(value, currentPartition, offset); + final Collector collector = new Collector() { --- End diff -- Good catch, @tzulitai ! I tried the buffer approach and had no luck. The problem is that calling `emitRecord`needs to pass in both the offset and the record itself -- The record is used to extract the timestamp in the Kafka 0.10 consumers. The buffer itself needs to buffer the deserialized value and the record itself -- it cannot solve the problem of having a collector per record. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-5710) Add ProcTime() function to indicate StreamSQL
[ https://issues.apache.org/jira/browse/FLINK-5710?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15881329#comment-15881329 ] ASF GitHub Bot commented on FLINK-5710: --- Github user haohui closed the pull request at: https://github.com/apache/flink/pull/3370 > Add ProcTime() function to indicate StreamSQL > - > > Key: FLINK-5710 > URL: https://issues.apache.org/jira/browse/FLINK-5710 > Project: Flink > Issue Type: New Feature > Components: DataStream API >Reporter: Stefano Bortoli >Assignee: Stefano Bortoli >Priority: Minor > > procTime() is a parameterless scalar function that just indicates processing > time mode -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #3370: [FLINK-5710] Add ProcTime() function to indicate S...
Github user haohui closed the pull request at: https://github.com/apache/flink/pull/3370 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Updated] (FLINK-5597) Improve the LocalClusteringCoefficient documentation
[ https://issues.apache.org/jira/browse/FLINK-5597?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Greg Hogan updated FLINK-5597: -- Fix Version/s: 1.3.0 > Improve the LocalClusteringCoefficient documentation > > > Key: FLINK-5597 > URL: https://issues.apache.org/jira/browse/FLINK-5597 > Project: Flink > Issue Type: Improvement > Components: Documentation, Gelly >Affects Versions: 1.3.0 >Reporter: Vasia Kalavri >Assignee: Greg Hogan > Fix For: 1.3.0 > > > The LocalClusteringCoefficient usage section should explain what is the > algorithm output and how to retrieve the actual local clustering coefficient > scores from it. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #3404: [FLINK-5597] [docs] Improve the LocalClusteringCoe...
GitHub user greghogan opened a pull request: https://github.com/apache/flink/pull/3404 [FLINK-5597] [docs] Improve the LocalClusteringCoefficient documentation Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration. If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide](http://flink.apache.org/how-to-contribute.html). In addition to going through the list, please provide a meaningful description of your changes. - [x] General - The pull request references the related JIRA issue ("[FLINK-XXX] Jira title text") - The pull request addresses only one issue - Each commit in the PR has a meaningful commit message (including the JIRA id) - [x] Documentation - Documentation has been added for new functionality - Old documentation affected by the pull request has been updated - JavaDoc for public methods has been added - [ ] Tests & Build - Functionality added by the pull request is covered by tests - `mvn clean verify` has been executed successfully locally or a Travis build has passed You can merge this pull request into a Git repository by running: $ git pull https://github.com/greghogan/flink 5597_improve_the_localclusteringcoefficient_documentation Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3404.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3404 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-5597) Improve the LocalClusteringCoefficient documentation
[ https://issues.apache.org/jira/browse/FLINK-5597?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15881320#comment-15881320 ] ASF GitHub Bot commented on FLINK-5597: --- GitHub user greghogan opened a pull request: https://github.com/apache/flink/pull/3404 [FLINK-5597] [docs] Improve the LocalClusteringCoefficient documentation Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration. If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide](http://flink.apache.org/how-to-contribute.html). In addition to going through the list, please provide a meaningful description of your changes. - [x] General - The pull request references the related JIRA issue ("[FLINK-XXX] Jira title text") - The pull request addresses only one issue - Each commit in the PR has a meaningful commit message (including the JIRA id) - [x] Documentation - Documentation has been added for new functionality - Old documentation affected by the pull request has been updated - JavaDoc for public methods has been added - [ ] Tests & Build - Functionality added by the pull request is covered by tests - `mvn clean verify` has been executed successfully locally or a Travis build has passed You can merge this pull request into a Git repository by running: $ git pull https://github.com/greghogan/flink 5597_improve_the_localclusteringcoefficient_documentation Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3404.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3404 > Improve the LocalClusteringCoefficient documentation > > > Key: FLINK-5597 > URL: https://issues.apache.org/jira/browse/FLINK-5597 > Project: Flink > Issue Type: Improvement > Components: Documentation, Gelly >Affects Versions: 1.3.0 >Reporter: Vasia Kalavri >Assignee: Greg Hogan > > The LocalClusteringCoefficient usage section should explain what is the > algorithm output and how to retrieve the actual local clustering coefficient > scores from it. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Updated] (FLINK-5242) Implement Scala API for BipartiteGraph
[ https://issues.apache.org/jira/browse/FLINK-5242?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Greg Hogan updated FLINK-5242: -- Issue Type: Sub-task (was: New Feature) Parent: FLINK-2254 > Implement Scala API for BipartiteGraph > -- > > Key: FLINK-5242 > URL: https://issues.apache.org/jira/browse/FLINK-5242 > Project: Flink > Issue Type: Sub-task > Components: Gelly >Reporter: Ivan Mushketyk >Assignee: Ivan Mushketyk > Labels: features > > Should implement BipartiteGraph in flink-gelly-scala project similarly to > Graph class. > Depends on this: https://issues.apache.org/jira/browse/FLINK-2254 -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Updated] (FLINK-5243) Implement an example for BipartiteGraph
[ https://issues.apache.org/jira/browse/FLINK-5243?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Greg Hogan updated FLINK-5243: -- Issue Type: Sub-task (was: New Feature) Parent: FLINK-2254 > Implement an example for BipartiteGraph > --- > > Key: FLINK-5243 > URL: https://issues.apache.org/jira/browse/FLINK-5243 > Project: Flink > Issue Type: Sub-task > Components: Gelly >Reporter: Ivan Mushketyk > Labels: beginner > > Should implement example for BipartiteGraph in gelly-examples project > similarly to examples for Graph class. > Depends on this: https://issues.apache.org/jira/browse/FLINK-2254 -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Updated] (FLINK-5245) Add support for BipartiteGraph mutations
[ https://issues.apache.org/jira/browse/FLINK-5245?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Greg Hogan updated FLINK-5245: -- Issue Type: Sub-task (was: Improvement) Parent: FLINK-2254 > Add support for BipartiteGraph mutations > > > Key: FLINK-5245 > URL: https://issues.apache.org/jira/browse/FLINK-5245 > Project: Flink > Issue Type: Sub-task > Components: Gelly >Reporter: Ivan Mushketyk >Assignee: Ivan Mushketyk > > Implement methods for adding and removing vertices and edges similarly to > Graph class. > Depends on https://issues.apache.org/jira/browse/FLINK-2254 -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Updated] (FLINK-5244) Implement methods for BipartiteGraph transformations
[ https://issues.apache.org/jira/browse/FLINK-5244?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Greg Hogan updated FLINK-5244: -- Issue Type: Sub-task (was: Improvement) Parent: FLINK-2254 > Implement methods for BipartiteGraph transformations > > > Key: FLINK-5244 > URL: https://issues.apache.org/jira/browse/FLINK-5244 > Project: Flink > Issue Type: Sub-task > Components: Gelly >Reporter: Ivan Mushketyk >Assignee: Ivan Mushketyk > > BipartiteGraph should implement methods for transforming graph, like map, > filter, join, union, difference, etc. similarly to Graph class. > Depends on: https://issues.apache.org/jira/browse/FLINK-2254 -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Updated] (FLINK-5362) Implement methods to access BipartiteGraph properties
[ https://issues.apache.org/jira/browse/FLINK-5362?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Greg Hogan updated FLINK-5362: -- Issue Type: Sub-task (was: Improvement) Parent: FLINK-2254 > Implement methods to access BipartiteGraph properties > - > > Key: FLINK-5362 > URL: https://issues.apache.org/jira/browse/FLINK-5362 > Project: Flink > Issue Type: Sub-task > Components: Gelly >Reporter: Ivan Mushketyk >Assignee: Ivan Mushketyk > -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Comment Edited] (FLINK-4848) keystoreFilePath should be checked against null in SSLUtils#createSSLServerContext
[ https://issues.apache.org/jira/browse/FLINK-4848?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15582930#comment-15582930 ] Ted Yu edited comment on FLINK-4848 at 2/23/17 7:26 PM: There is similar issue with trustStoreFilePath: {code} trustStoreFile = new FileInputStream(new File(trustStoreFilePath)); {code} was (Author: yuzhih...@gmail.com): There is similar issue with trustStoreFilePath: {code} trustStoreFile = new FileInputStream(new File(trustStoreFilePath)); {code} > keystoreFilePath should be checked against null in > SSLUtils#createSSLServerContext > -- > > Key: FLINK-4848 > URL: https://issues.apache.org/jira/browse/FLINK-4848 > Project: Flink > Issue Type: Bug > Components: Security >Reporter: Ted Yu >Priority: Minor > > {code} > String keystoreFilePath = sslConfig.getString( > ConfigConstants.SECURITY_SSL_KEYSTORE, > null); > ... > try { > keyStoreFile = new FileInputStream(new File(keystoreFilePath)); > {code} > If keystoreFilePath is null, the File ctor would throw NPE. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Resolved] (FLINK-3801) Upgrade Joda-Time library to 2.9.3
[ https://issues.apache.org/jira/browse/FLINK-3801?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ted Yu resolved FLINK-3801. --- Resolution: Later > Upgrade Joda-Time library to 2.9.3 > -- > > Key: FLINK-3801 > URL: https://issues.apache.org/jira/browse/FLINK-3801 > Project: Flink > Issue Type: Improvement > Components: Build System >Reporter: Ted Yu >Priority: Minor > > Currently yoda-time 2.5 is used which was very old. > We should upgrade to 2.9.3 -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5763) Make savepoints self-contained and relocatable
[ https://issues.apache.org/jira/browse/FLINK-5763?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15881045#comment-15881045 ] Stephan Ewen commented on FLINK-5763: - Second part implemented in 6e7a91741708a2b167a2bbca5dda5b2059df5e18 Followups implemented in df16e50bbf01d26f75b7745dacd5779ad47dcce5 > Make savepoints self-contained and relocatable > -- > > Key: FLINK-5763 > URL: https://issues.apache.org/jira/browse/FLINK-5763 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Reporter: Ufuk Celebi >Assignee: Ufuk Celebi > Fix For: 1.3.0 > > > After a user has triggered a savepoint, a single savepoint file will be > returned as a handle to the savepoint. A savepoint to {{}} creates a > savepoint file like {{/savepoint-}}. > This file contains the metadata of the corresponding checkpoint, but not the > actual program state. While this works well for short term management > (pause-and-resume a job), it makes it hard to manage savepoints over longer > periods of time. > h4. Problems > h5. Scattered Checkpoint Files > For file system based checkpoints (FsStateBackend, RocksDBStateBackend) this > results in the savepoint referencing files from the checkpoint directory > (usually different than ). For users, it is virtually impossible to > tell which checkpoint files belong to a savepoint and which are lingering > around. This can easily lead to accidentally invalidating a savepoint by > deleting checkpoint files. > h5. Savepoints Not Relocatable > Even if a user is able to figure out which checkpoint files belong to a > savepoint, moving these files will invalidate the savepoint as well, because > the metadata file references absolute file paths. > h5. Forced to Use CLI for Disposal > Because of the scattered files, the user is in practice forced to use Flink’s > CLI to dispose a savepoint. This should be possible to handle in the scope of > the user’s environment via a file system delete operation. > h4. Proposal > In order to solve the described problems, savepoints should contain all their > state, both metadata and program state, inside a single directory. > Furthermore the metadata must only hold relative references to the checkpoint > files. This makes it obvious which files make up the state of a savepoint and > it is possible to move savepoints around by moving the savepoint directory. > h5. Desired File Layout > Triggering a savepoint to {{}} creates a directory as follows: > {code} > /savepoint-- > +-- _metadata > +-- data- [1 or more] > {code} > We include the JobID in the savepoint directory name in order to give some > hints about which job a savepoint belongs to. > h5. CLI > - Trigger: When triggering a savepoint to {{}} the savepoint > directory will be returned as the handle to the savepoint. > - Restore: Users can restore by pointing to the directory or the _metadata > file. The data files should be required to be in the same directory as the > _metadata file. > - Dispose: The disposal command should be deprecated and eventually removed. > While deprecated, disposal can happen by specifying the directory or the > _metadata file (same as restore). -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Closed] (FLINK-5887) Make CheckpointBarrier type immutable
[ https://issues.apache.org/jira/browse/FLINK-5887?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stephan Ewen closed FLINK-5887. --- > Make CheckpointBarrier type immutable > - > > Key: FLINK-5887 > URL: https://issues.apache.org/jira/browse/FLINK-5887 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Affects Versions: 1.3.0 >Reporter: Stephan Ewen >Assignee: Stephan Ewen > Fix For: 1.3.0 > > > The title says it all. Such a critical type that is referenced and compared > to should not be mutable. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Resolved] (FLINK-5887) Make CheckpointBarrier type immutable
[ https://issues.apache.org/jira/browse/FLINK-5887?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stephan Ewen resolved FLINK-5887. - Resolution: Fixed Fix Version/s: 1.3.0 Fixed via 8ffe75a54f24cbd8e69c455b42a4e969b943a279 > Make CheckpointBarrier type immutable > - > > Key: FLINK-5887 > URL: https://issues.apache.org/jira/browse/FLINK-5887 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Affects Versions: 1.3.0 >Reporter: Stephan Ewen >Assignee: Stephan Ewen > Fix For: 1.3.0 > > > The title says it all. Such a critical type that is referenced and compared > to should not be mutable. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Updated] (FLINK-5541) Missing null check for localJar in FlinkSubmitter#submitTopology()
[ https://issues.apache.org/jira/browse/FLINK-5541?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ted Yu updated FLINK-5541: -- Description: {code} if (localJar == null) { try { for (final URL url : ((ContextEnvironment) ExecutionEnvironment.getExecutionEnvironment()) .getJars()) { // TODO verify that there is only one jar localJar = new File(url.toURI()).getAbsolutePath(); } } catch (final URISyntaxException e) { // ignore } catch (final ClassCastException e) { // ignore } } logger.info("Submitting topology " + name + " in distributed mode with conf " + serConf); client.submitTopologyWithOpts(name, localJar, topology); {code} Since the try block may encounter URISyntaxException / ClassCastException, we should check that localJar is not null before calling submitTopologyWithOpts(). was: {code} if (localJar == null) { try { for (final URL url : ((ContextEnvironment) ExecutionEnvironment.getExecutionEnvironment()) .getJars()) { // TODO verify that there is only one jar localJar = new File(url.toURI()).getAbsolutePath(); } } catch (final URISyntaxException e) { // ignore } catch (final ClassCastException e) { // ignore } } logger.info("Submitting topology " + name + " in distributed mode with conf " + serConf); client.submitTopologyWithOpts(name, localJar, topology); {code} Since the try block may encounter URISyntaxException / ClassCastException, we should check that localJar is not null before calling submitTopologyWithOpts(). > Missing null check for localJar in FlinkSubmitter#submitTopology() > -- > > Key: FLINK-5541 > URL: https://issues.apache.org/jira/browse/FLINK-5541 > Project: Flink > Issue Type: Bug > Components: Storm Compatibility >Reporter: Ted Yu >Priority: Minor > > {code} > if (localJar == null) { > try { > for (final URL url : ((ContextEnvironment) > ExecutionEnvironment.getExecutionEnvironment()) > .getJars()) { > // TODO verify that there is only one jar > localJar = new File(url.toURI()).getAbsolutePath(); > } > } catch (final URISyntaxException e) { > // ignore > } catch (final ClassCastException e) { > // ignore > } > } > logger.info("Submitting topology " + name + " in distributed mode with > conf " + serConf); > client.submitTopologyWithOpts(name, localJar, topology); > {code} > Since the try block may encounter URISyntaxException / ClassCastException, we > should check that localJar is not null before calling > submitTopologyWithOpts(). -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Updated] (FLINK-5629) Unclosed RandomAccessFile in StaticFileServerHandler#respondAsLeader()
[ https://issues.apache.org/jira/browse/FLINK-5629?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ted Yu updated FLINK-5629: -- Description: {code} final RandomAccessFile raf; try { raf = new RandomAccessFile(file, "r"); ... long fileLength = raf.length(); {code} The RandomAccessFile should be closed upon return from method. was: {code} final RandomAccessFile raf; try { raf = new RandomAccessFile(file, "r"); ... long fileLength = raf.length(); {code} The RandomAccessFile should be closed upon return from method. > Unclosed RandomAccessFile in StaticFileServerHandler#respondAsLeader() > -- > > Key: FLINK-5629 > URL: https://issues.apache.org/jira/browse/FLINK-5629 > Project: Flink > Issue Type: Bug > Components: Webfrontend >Reporter: Ted Yu >Priority: Minor > > {code} > final RandomAccessFile raf; > try { > raf = new RandomAccessFile(file, "r"); > ... > long fileLength = raf.length(); > {code} > The RandomAccessFile should be closed upon return from method. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (FLINK-5897) Untie Checkpoint Externalization from FileSystems
Stephan Ewen created FLINK-5897: --- Summary: Untie Checkpoint Externalization from FileSystems Key: FLINK-5897 URL: https://issues.apache.org/jira/browse/FLINK-5897 Project: Flink Issue Type: Sub-task Components: State Backends, Checkpointing Affects Versions: 1.2.0 Reporter: Stephan Ewen Assignee: Stephan Ewen Fix For: 1.3.0 Currently, externalizing checkpoint metadata and storing savepoints depends strictly on FileSystems. Since state backends are more general, storing and cleaning up checkpoints with state backend hooks requires to untie savepoints and externalized checkpoints from filesystems. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Closed] (FLINK-5877) Fix Scala snippet in Async I/O API doc
[ https://issues.apache.org/jira/browse/FLINK-5877?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stephan Ewen closed FLINK-5877. --- > Fix Scala snippet in Async I/O API doc > -- > > Key: FLINK-5877 > URL: https://issues.apache.org/jira/browse/FLINK-5877 > Project: Flink > Issue Type: Improvement > Components: Documentation >Reporter: Andrea Sella >Assignee: Andrea Sella >Priority: Minor > Fix For: 1.3.0, 1.2.1 > > > The current `docs/dev/stream/asyncio.md` has an error in the Scala snippet, > it doesn't use the Scala AsyncFunction and the brackets are inconsistent. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Resolved] (FLINK-5877) Fix Scala snippet in Async I/O API doc
[ https://issues.apache.org/jira/browse/FLINK-5877?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stephan Ewen resolved FLINK-5877. - Resolution: Fixed Fix Version/s: 1.2.1 1.3.0 Fixed in - 1.2.1 via b5ec146413bedf55867e15652c7e29f1e4e2d220 - 1.3.0 via 2d2ffbad9684e879aa92473798701b7cfc0d1277 > Fix Scala snippet in Async I/O API doc > -- > > Key: FLINK-5877 > URL: https://issues.apache.org/jira/browse/FLINK-5877 > Project: Flink > Issue Type: Improvement > Components: Documentation >Reporter: Andrea Sella >Assignee: Andrea Sella >Priority: Minor > Fix For: 1.3.0, 1.2.1 > > > The current `docs/dev/stream/asyncio.md` has an error in the Scala snippet, > it doesn't use the Scala AsyncFunction and the brackets are inconsistent. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #:
Github user StephanEwen commented on the pull request: https://github.com/apache/flink/commit/30c9e2b683bf7f4776ffc23b6a860946a4429ae5#commitcomment-21018833 Great to have this in! I would suggest to mention in the JavaDocs of `MapState.size()` that this can be a potentially expensive operation, because it may entail iterating over many entries in some state backends (like RocksDB). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Closed] (FLINK-5763) Make savepoints self-contained and relocatable
[ https://issues.apache.org/jira/browse/FLINK-5763?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stephan Ewen closed FLINK-5763. --- > Make savepoints self-contained and relocatable > -- > > Key: FLINK-5763 > URL: https://issues.apache.org/jira/browse/FLINK-5763 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Reporter: Ufuk Celebi >Assignee: Ufuk Celebi > Fix For: 1.3.0 > > > After a user has triggered a savepoint, a single savepoint file will be > returned as a handle to the savepoint. A savepoint to {{}} creates a > savepoint file like {{/savepoint-}}. > This file contains the metadata of the corresponding checkpoint, but not the > actual program state. While this works well for short term management > (pause-and-resume a job), it makes it hard to manage savepoints over longer > periods of time. > h4. Problems > h5. Scattered Checkpoint Files > For file system based checkpoints (FsStateBackend, RocksDBStateBackend) this > results in the savepoint referencing files from the checkpoint directory > (usually different than ). For users, it is virtually impossible to > tell which checkpoint files belong to a savepoint and which are lingering > around. This can easily lead to accidentally invalidating a savepoint by > deleting checkpoint files. > h5. Savepoints Not Relocatable > Even if a user is able to figure out which checkpoint files belong to a > savepoint, moving these files will invalidate the savepoint as well, because > the metadata file references absolute file paths. > h5. Forced to Use CLI for Disposal > Because of the scattered files, the user is in practice forced to use Flink’s > CLI to dispose a savepoint. This should be possible to handle in the scope of > the user’s environment via a file system delete operation. > h4. Proposal > In order to solve the described problems, savepoints should contain all their > state, both metadata and program state, inside a single directory. > Furthermore the metadata must only hold relative references to the checkpoint > files. This makes it obvious which files make up the state of a savepoint and > it is possible to move savepoints around by moving the savepoint directory. > h5. Desired File Layout > Triggering a savepoint to {{}} creates a directory as follows: > {code} > /savepoint-- > +-- _metadata > +-- data- [1 or more] > {code} > We include the JobID in the savepoint directory name in order to give some > hints about which job a savepoint belongs to. > h5. CLI > - Trigger: When triggering a savepoint to {{}} the savepoint > directory will be returned as the handle to the savepoint. > - Restore: Users can restore by pointing to the directory or the _metadata > file. The data files should be required to be in the same directory as the > _metadata file. > - Dispose: The disposal command should be deprecated and eventually removed. > While deprecated, disposal can happen by specifying the directory or the > _metadata file (same as restore). -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Resolved] (FLINK-5763) Make savepoints self-contained and relocatable
[ https://issues.apache.org/jira/browse/FLINK-5763?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stephan Ewen resolved FLINK-5763. - Resolution: Fixed Fix Version/s: 1.3.0 Fixed in fcc1efcb05bce13e435946107a842727b1e3ee20 and 2edc97185700a5bdb3e181a71493d681c0f693e3 > Make savepoints self-contained and relocatable > -- > > Key: FLINK-5763 > URL: https://issues.apache.org/jira/browse/FLINK-5763 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Reporter: Ufuk Celebi >Assignee: Ufuk Celebi > Fix For: 1.3.0 > > > After a user has triggered a savepoint, a single savepoint file will be > returned as a handle to the savepoint. A savepoint to {{}} creates a > savepoint file like {{/savepoint-}}. > This file contains the metadata of the corresponding checkpoint, but not the > actual program state. While this works well for short term management > (pause-and-resume a job), it makes it hard to manage savepoints over longer > periods of time. > h4. Problems > h5. Scattered Checkpoint Files > For file system based checkpoints (FsStateBackend, RocksDBStateBackend) this > results in the savepoint referencing files from the checkpoint directory > (usually different than ). For users, it is virtually impossible to > tell which checkpoint files belong to a savepoint and which are lingering > around. This can easily lead to accidentally invalidating a savepoint by > deleting checkpoint files. > h5. Savepoints Not Relocatable > Even if a user is able to figure out which checkpoint files belong to a > savepoint, moving these files will invalidate the savepoint as well, because > the metadata file references absolute file paths. > h5. Forced to Use CLI for Disposal > Because of the scattered files, the user is in practice forced to use Flink’s > CLI to dispose a savepoint. This should be possible to handle in the scope of > the user’s environment via a file system delete operation. > h4. Proposal > In order to solve the described problems, savepoints should contain all their > state, both metadata and program state, inside a single directory. > Furthermore the metadata must only hold relative references to the checkpoint > files. This makes it obvious which files make up the state of a savepoint and > it is possible to move savepoints around by moving the savepoint directory. > h5. Desired File Layout > Triggering a savepoint to {{}} creates a directory as follows: > {code} > /savepoint-- > +-- _metadata > +-- data- [1 or more] > {code} > We include the JobID in the savepoint directory name in order to give some > hints about which job a savepoint belongs to. > h5. CLI > - Trigger: When triggering a savepoint to {{}} the savepoint > directory will be returned as the handle to the savepoint. > - Restore: Users can restore by pointing to the directory or the _metadata > file. The data files should be required to be in the same directory as the > _metadata file. > - Dispose: The disposal command should be deprecated and eventually removed. > While deprecated, disposal can happen by specifying the directory or the > _metadata file (same as restore). -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5157) Extending AllWindow Function Metadata
[ https://issues.apache.org/jira/browse/FLINK-5157?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15880789#comment-15880789 ] ASF GitHub Bot commented on FLINK-5157: --- Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/2946 Thanks I'll have a look! > Extending AllWindow Function Metadata > - > > Key: FLINK-5157 > URL: https://issues.apache.org/jira/browse/FLINK-5157 > Project: Flink > Issue Type: New Feature > Components: DataStream API, Streaming >Reporter: Ventura Del Monte >Assignee: Ventura Del Monte > > Following the logic behind [1,2], ProcessAllWindowFunction can be introduced > in Flink and AllWindowedStream can be extended in order to support them. > [1] > https://cwiki.apache.org/confluence/display/FLINK/FLIP-2+Extending+Window+Function+Metadata > [2] https://issues.apache.org/jira/browse/FLINK-4997 -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink issue #2946: [FLINK-5157] Extend AllWindow function metadata
Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/2946 Thanks I'll have a look! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---