[GitHub] [flink] XuPingyong commented on a change in pull request #8937: [FLINK-13040] promote blink table config and add to document
XuPingyong commented on a change in pull request #8937: [FLINK-13040] promote blink table config and add to document URL: https://github.com/apache/flink/pull/8937#discussion_r299327232 ## File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/api/PlannerConfigOptions.java ## @@ -30,7 +31,8 @@ // // Optimizer Options // - + @Documentation.ExcludeFromDocumentation Review comment: General users are not easy to understand and have no need to use these configs in most scenes. Advanced users can tune performance of some special cases according to them. So I do not want to place these professional configs to the documentation. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] dawidwys commented on a change in pull request #8929: [FLINK-13028][table] Move expression resolver to flink-table-api-java
dawidwys commented on a change in pull request #8929: [FLINK-13028][table] Move expression resolver to flink-table-api-java URL: https://github.com/apache/flink/pull/8929#discussion_r299327183 ## File path: flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/ResolveCallByArgumentsRule.java ## @@ -38,75 +40,103 @@ import org.apache.flink.table.types.inference.TypeStrategies; import org.apache.flink.util.Preconditions; -import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.Optional; import java.util.stream.Collectors; import java.util.stream.IntStream; +import static java.util.Collections.singletonList; +import static org.apache.flink.table.expressions.utils.ApiExpressionUtils.valueLiteral; +import static org.apache.flink.table.types.utils.TypeConversions.fromDataTypeToLegacyInfo; +import static org.apache.flink.table.types.utils.TypeConversions.fromLegacyInfoToDataType; + /** * This rule checks if a {@link UnresolvedCallExpression} can work with the given arguments and infers * the output data type. All function calls are resolved {@link CallExpression} after applying this - * rule except for the special case of {@link BuiltInFunctionDefinitions#FLATTEN}. + * rule. + * + * This rule also resolves {@code flatten()} calls on composite types. * * If the call expects different types of arguments, but the given arguments have types that can * be casted, a {@link BuiltInFunctionDefinitions#CAST} expression is inserted. - * - * @see ResolveFlattenCallRule */ @Internal final class ResolveCallByArgumentsRule implements ResolverRule { @Override public List apply(List expression, ResolutionContext context) { return expression.stream() - .map(expr -> expr.accept(new CallArgumentsCastingVisitor(context))) + .flatMap(expr -> expr.accept(new ResolvingCallVisitor(context)).stream()) .collect(Collectors.toList()); } - private class CallArgumentsCastingVisitor extends RuleExpressionVisitor { + // + + private class ResolvingCallVisitor extends RuleExpressionVisitor> { - CallArgumentsCastingVisitor(ResolutionContext context) { + ResolvingCallVisitor(ResolutionContext context) { super(context); } @Override - public Expression visit(UnresolvedCallExpression unresolvedCall) { + public List visit(UnresolvedCallExpression unresolvedCall) { final List resolvedArgs = unresolvedCall.getChildren().stream() - .map(c -> c.accept(this)) - .map(e -> { - // special case: FLATTEN - // a call chain `myFunc().flatten().flatten()` is not allowed - if (e instanceof UnresolvedCallExpression && - ((UnresolvedCallExpression) e).getFunctionDefinition() == BuiltInFunctionDefinitions.FLATTEN) { - throw new ValidationException("Consecutive flattening calls are not allowed."); - } - if (e instanceof ResolvedExpression) { - return (ResolvedExpression) e; - } - throw new TableException("Unexpected unresolved expression: " + e); - }) + .flatMap(c -> c.accept(this).stream()) .collect(Collectors.toList()); - // FLATTEN is a special case and the only call that remains unresolved after this rule - // it will be resolved by ResolveFlattenCallRule if (unresolvedCall.getFunctionDefinition() == BuiltInFunctionDefinitions.FLATTEN) { - return unresolvedCall.replaceArgs(new ArrayList<>(resolvedArgs)); + return executeFlatten(resolvedArgs); } if (unresolvedCall.getFunctionDefinition() instanceof BuiltInFunctionDefinition) { final BuiltInFunctionDefinition definition = (BuiltInFunctionDefinition) unresolvedCall.getFunctionDefinition(); if (definition.getTypeInference().getOutputTypeStrategy() != TypeStrategies.MISSING) { - return runTypeInference( -
[GitHub] [flink] becketqin commented on issue #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE)
becketqin commented on issue #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE) URL: https://github.com/apache/flink/pull/6594#issuecomment-507544552 @Xeli Just to let you know, I am reviewing the patch. Hopefully we can check this in before 1.9 release (code freeze on Jul 5). So we will be able to get more feedback as users onboard. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-13051) Drop the non-selectable two-input StreamTask and Processor
[ https://issues.apache.org/jira/browse/FLINK-13051?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Haibo Sun updated FLINK-13051: -- Description: After `StreamTwoInputSelectableProcessor` supports `CheckpointBarrierHandler`, we should drop the non-selectable two-input StreamTask and Processor. (was: After `StreamTwoInputSelectableProcessor` supports `CheckpointBarrierHandler`, we should drop the non-selectable two-input StreamTask and Processor.) > Drop the non-selectable two-input StreamTask and Processor > -- > > Key: FLINK-13051 > URL: https://issues.apache.org/jira/browse/FLINK-13051 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Task >Reporter: Haibo Sun >Assignee: Haibo Sun >Priority: Major > > After `StreamTwoInputSelectableProcessor` supports > `CheckpointBarrierHandler`, we should drop the non-selectable two-input > StreamTask and Processor. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] XuPingyong commented on a change in pull request #8937: [FLINK-13040] promote blink table config and add to document
XuPingyong commented on a change in pull request #8937: [FLINK-13040] promote blink table config and add to document URL: https://github.com/apache/flink/pull/8937#discussion_r299323887 ## File path: flink-docs/src/main/java/org/apache/flink/docs/configuration/ConfigOptionsDocGenerator.java ## @@ -260,10 +262,26 @@ private static String toHtmlTable(final List options) { private static String toHtmlString(final OptionWithMetaInfo optionWithMetaInfo) { ConfigOption option = optionWithMetaInfo.option; String defaultValue = stringifyDefault(optionWithMetaInfo); + Documentation.TableOption tableOption = optionWithMetaInfo.field.getAnnotation(Documentation.TableOption.class); + StringBuilder sb = new StringBuilder(); + if (tableOption != null) { + Documentation.ExecMode execMode = tableOption.execMode(); + if (Documentation.ExecMode.BATCH.equals(execMode) || Documentation.ExecMode.STREAMING.equals(execMode)) { + sb.append(" ") + .append(execMode.toString()) + .append(""); + } else if (Documentation.ExecMode.BOTH.equals(execMode)) { Review comment: Fixed it. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] XuPingyong commented on a change in pull request #8937: [FLINK-13040] promote blink table config and add to document
XuPingyong commented on a change in pull request #8937: [FLINK-13040] promote blink table config and add to document URL: https://github.com/apache/flink/pull/8937#discussion_r299323832 ## File path: flink-annotations/src/main/java/org/apache/flink/annotation/docs/Documentation.java ## @@ -59,6 +59,28 @@ int position() default Integer.MAX_VALUE; } + /** +* Annotation used on config option fields to include them in the "Table Options" section. Review comment: Thanks, fixed it. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] XuPingyong commented on a change in pull request #8937: [FLINK-13040] promote blink table config and add to document
XuPingyong commented on a change in pull request #8937: [FLINK-13040] promote blink table config and add to document URL: https://github.com/apache/flink/pull/8937#discussion_r299323740 ## File path: docs/ops/config.md ## @@ -183,6 +183,12 @@ unless user define a `OptionsFactory` and set via `RocksDBStateBackend.setOption {% include generated/rocks_db_configurable_configuration.html %} +### blink table planner Review comment: - Documents about blink will be introduced later, then config will also be placed together with the related API. - Fixed it. - I moved these to the end of this page. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] twalthr commented on a change in pull request #8929: [FLINK-13028][table] Move expression resolver to flink-table-api-java
twalthr commented on a change in pull request #8929: [FLINK-13028][table] Move expression resolver to flink-table-api-java URL: https://github.com/apache/flink/pull/8929#discussion_r299323172 ## File path: flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/ResolveCallByArgumentsRule.java ## @@ -38,75 +40,103 @@ import org.apache.flink.table.types.inference.TypeStrategies; import org.apache.flink.util.Preconditions; -import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.Optional; import java.util.stream.Collectors; import java.util.stream.IntStream; +import static java.util.Collections.singletonList; +import static org.apache.flink.table.expressions.utils.ApiExpressionUtils.valueLiteral; +import static org.apache.flink.table.types.utils.TypeConversions.fromDataTypeToLegacyInfo; +import static org.apache.flink.table.types.utils.TypeConversions.fromLegacyInfoToDataType; + /** * This rule checks if a {@link UnresolvedCallExpression} can work with the given arguments and infers * the output data type. All function calls are resolved {@link CallExpression} after applying this - * rule except for the special case of {@link BuiltInFunctionDefinitions#FLATTEN}. + * rule. + * + * This rule also resolves {@code flatten()} calls on composite types. * * If the call expects different types of arguments, but the given arguments have types that can * be casted, a {@link BuiltInFunctionDefinitions#CAST} expression is inserted. - * - * @see ResolveFlattenCallRule */ @Internal final class ResolveCallByArgumentsRule implements ResolverRule { @Override public List apply(List expression, ResolutionContext context) { return expression.stream() - .map(expr -> expr.accept(new CallArgumentsCastingVisitor(context))) + .flatMap(expr -> expr.accept(new ResolvingCallVisitor(context)).stream()) .collect(Collectors.toList()); } - private class CallArgumentsCastingVisitor extends RuleExpressionVisitor { + // + + private class ResolvingCallVisitor extends RuleExpressionVisitor> { - CallArgumentsCastingVisitor(ResolutionContext context) { + ResolvingCallVisitor(ResolutionContext context) { super(context); } @Override - public Expression visit(UnresolvedCallExpression unresolvedCall) { + public List visit(UnresolvedCallExpression unresolvedCall) { final List resolvedArgs = unresolvedCall.getChildren().stream() - .map(c -> c.accept(this)) - .map(e -> { - // special case: FLATTEN - // a call chain `myFunc().flatten().flatten()` is not allowed - if (e instanceof UnresolvedCallExpression && - ((UnresolvedCallExpression) e).getFunctionDefinition() == BuiltInFunctionDefinitions.FLATTEN) { - throw new ValidationException("Consecutive flattening calls are not allowed."); - } - if (e instanceof ResolvedExpression) { - return (ResolvedExpression) e; - } - throw new TableException("Unexpected unresolved expression: " + e); - }) + .flatMap(c -> c.accept(this).stream()) .collect(Collectors.toList()); - // FLATTEN is a special case and the only call that remains unresolved after this rule - // it will be resolved by ResolveFlattenCallRule if (unresolvedCall.getFunctionDefinition() == BuiltInFunctionDefinitions.FLATTEN) { - return unresolvedCall.replaceArgs(new ArrayList<>(resolvedArgs)); + return executeFlatten(resolvedArgs); } if (unresolvedCall.getFunctionDefinition() instanceof BuiltInFunctionDefinition) { final BuiltInFunctionDefinition definition = (BuiltInFunctionDefinition) unresolvedCall.getFunctionDefinition(); if (definition.getTypeInference().getOutputTypeStrategy() != TypeStrategies.MISSING) { - return runTypeInference( -
[GitHub] [flink] XuPingyong commented on a change in pull request #8937: [FLINK-13040] promote blink table config and add to document
XuPingyong commented on a change in pull request #8937: [FLINK-13040] promote blink table config and add to document URL: https://github.com/apache/flink/pull/8937#discussion_r299322319 ## File path: flink-annotations/src/main/java/org/apache/flink/annotation/docs/Documentation.java ## @@ -59,6 +59,28 @@ int position() default Integer.MAX_VALUE; } + /** +* Annotation used on config option fields to include them in the "Table Options" section. +* +* The {@link TableOption#execMode()} argument indicate which exec mode the config works for, +* for batch, streaming or both. +* +*/ + @Target(ElementType.FIELD) + @Retention(RetentionPolicy.RUNTIME) + @Internal + public @interface TableOption { + Review comment: Fixed it. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] XuPingyong commented on a change in pull request #8937: [FLINK-13040] promote blink table config and add to document
XuPingyong commented on a change in pull request #8937: [FLINK-13040] promote blink table config and add to document URL: https://github.com/apache/flink/pull/8937#discussion_r299322273 ## File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/api/PlannerConfigOptions.java ## @@ -39,102 +41,134 @@ "(only count RexCall node, including leaves and interior nodes). Negative number to" + " use the default threshold: double of number of nodes."); + @Documentation.TableOption(execMode = Documentation.ExecMode.BOTH) Review comment: Thanks, I have remove the default. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] XuPingyong commented on issue #8937: [FLINK-13040] promote blink table config and add to document
XuPingyong commented on issue #8937: [FLINK-13040] promote blink table config and add to document URL: https://github.com/apache/flink/pull/8937#issuecomment-507541037 Thanks @zentol , I have fixed according to your comments. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Created] (FLINK-13051) Drop the non-selectable two-input StreamTask and Processor
Haibo Sun created FLINK-13051: - Summary: Drop the non-selectable two-input StreamTask and Processor Key: FLINK-13051 URL: https://issues.apache.org/jira/browse/FLINK-13051 Project: Flink Issue Type: Sub-task Components: Runtime / Task Reporter: Haibo Sun Assignee: Haibo Sun After `StreamTwoInputSelectableProcessor` supports `CheckpointBarrierHandler`, we should drop the non-selectable two-input StreamTask and Processor. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] flinkbot commented on issue #8942: [FLINK-13049][table-planner-blink] Port planner expressions to blink-planner from flink-planner
flinkbot commented on issue #8942: [FLINK-13049][table-planner-blink] Port planner expressions to blink-planner from flink-planner URL: https://github.com/apache/flink/pull/8942#issuecomment-507539575 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Review Progress * ❓ 1. The [description] looks good. * ❓ 2. There is [consensus] that the contribution should go into to Flink. * ❓ 3. Needs [attention] from. * ❓ 4. The change fits into the overall [architecture]. * ❓ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`) - `@flinkbot approve all` to approve all aspects - `@flinkbot approve-until architecture` to approve everything until `architecture` - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval you gave earlier This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-13049) Port planner expressions to blink-planner from flink-planner
[ https://issues.apache.org/jira/browse/FLINK-13049?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-13049: --- Labels: pull-request-available (was: ) > Port planner expressions to blink-planner from flink-planner > > > Key: FLINK-13049 > URL: https://issues.apache.org/jira/browse/FLINK-13049 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Planner >Reporter: Jingsong Lee >Assignee: Jingsong Lee >Priority: Major > Labels: pull-request-available > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] JingsongLi opened a new pull request #8942: [FLINK-13049][table-planner-blink] Port planner expressions to blink-planner from flink-planner
JingsongLi opened a new pull request #8942: [FLINK-13049][table-planner-blink] Port planner expressions to blink-planner from flink-planner URL: https://github.com/apache/flink/pull/8942 ## What is the purpose of the change Port planner expressions to blink-planner from flink-planner to support table api in blink-runner. We need use the planner expressions to type infer and validation. ## Brief change log 1.Rename windowProperties and PlannerResolvedFieldReference to avoid name conflit 2.Port planner expressions to blink-planner from flink-planner (And change type informations to DataType and LogicalType) 3.Enable test in KeywordParseTest and RexNodeExtractorTest ## Verifying this change ut ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no - The serializers: no - The runtime per-record code paths (performance sensitive): no - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: no - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? (no) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] c4emmmm commented on issue #8872: [FLINK-12977][table]Port CsvTableSource to api-java-bridge
c4e commented on issue #8872: [FLINK-12977][table]Port CsvTableSource to api-java-bridge URL: https://github.com/apache/flink/pull/8872#issuecomment-507537975 > Thanks @c4e . > > I think we should also add a `CsvTableSourceITCase` in blink-planner to verify the lookup and scan can work well. > > You can refer to `org.apache.flink.table.sources.csv.CsvTableSourceITCase#testTemporalJoinCsv`. @wuchong Thanks for your feedback. I have push a commit with most points fixed. Test case would be a follow-up commit. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-11637) Translate "Checkpoints" page into Chinese
[ https://issues.apache.org/jira/browse/FLINK-11637?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16876701#comment-16876701 ] yelun commented on FLINK-11637: --- [~WangHW] sorry,I will push recently > Translate "Checkpoints" page into Chinese > - > > Key: FLINK-11637 > URL: https://issues.apache.org/jira/browse/FLINK-11637 > Project: Flink > Issue Type: Sub-task > Components: chinese-translation, Documentation >Reporter: Congxian Qiu(klion26) >Assignee: yelun >Priority: Major > > doc locates in flink/docs/ops/state/checkpoints.md -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] liyafan82 commented on a change in pull request #8854: [FLINK-12959][table-planner-blink] Use BoundedInput and InputSelectable in blink and implement hash join
liyafan82 commented on a change in pull request #8854: [FLINK-12959][table-planner-blink] Use BoundedInput and InputSelectable in blink and implement hash join URL: https://github.com/apache/flink/pull/8854#discussion_r299301202 ## File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecExchange.scala ## @@ -163,37 +163,45 @@ class BatchExecExchange( val inputType = input.getOutputType.asInstanceOf[BaseRowTypeInfo] val outputRowType = BaseRowTypeInfo.of(FlinkTypeFactory.toLogicalRowType(getRowType)) -// TODO supports DataExchangeMode.BATCH in runtime -if (requiredExchangeMode.contains(DataExchangeMode.BATCH)) { - throw new TableException("DataExchangeMode.BATCH is not supported now") +val shuffleMode = requiredExchangeMode match { + case None => ShuffleMode.PIPELINED + case Some(mode) => +mode match { + case DataExchangeMode.BATCH => ShuffleMode.BATCH + case DataExchangeMode.PIPELINED => ShuffleMode.PIPELINED Review comment: how about PIPELINE_WITH_BATCH_FALLBACK? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] wuchong commented on a change in pull request #8872: [FLINK-12977][table]Port CsvTableSource to api-java-bridge
wuchong commented on a change in pull request #8872: [FLINK-12977][table]Port CsvTableSource to api-java-bridge URL: https://github.com/apache/flink/pull/8872#discussion_r299042764 ## File path: flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sinks/CsvTableSink.java ## @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.sinks; + +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.operators.DataSink; +import org.apache.flink.api.java.operators.MapOperator; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.DataStreamSink; +import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; +import org.apache.flink.table.utils.TableConnectorUtils; +import org.apache.flink.types.Row; + +/** + * A simple [[TableSink]] to emit data as CSV files. + */ +public class CsvTableSink implements BatchTableSink, AppendStreamTableSink { + private String path; + private String fieldDelim; + private int numFiles = -1; + private FileSystem.WriteMode writeMode; + + private String[] fieldNames; + private TypeInformation[] fieldTypes; + + /** +* A simple [[TableSink]] to emit data as CSV files. +* +* @param path The output path to write the Table to. +* @param fieldDelim The field delimiter +* @param numFiles The number of files to write to +* @param writeMode The write mode to specify whether existing files are overwritten or not. +*/ + public CsvTableSink( + String path, + String fieldDelim, + int numFiles, + FileSystem.WriteMode writeMode) { + this.path = path; + this.fieldDelim = fieldDelim; + this.numFiles = numFiles; + this.writeMode = writeMode; + } + + /** +* A simple [[TableSink]] to emit data as CSV files using comma as field delimiter, with default +* parallelism and write mode. +* +* @param path The output path to write the Table to. +*/ + public CsvTableSink(String path) { + this(path, ","); + } + + /** +* A simple [[TableSink]] to emit data as CSV files, with default parallelism and write mode. +* +* @param path The output path to write the Table to. +* @param fieldDelim The field delimiter +*/ + public CsvTableSink(String path, String fieldDelim) { + this(path, fieldDelim, -1, null); + } + + @Override + public void emitDataSet(DataSet dataSet) { + MapOperator csvRows = + dataSet.map(new CsvFormatter(fieldDelim == null ? "," : fieldDelim)); + + DataSink sink; + if (writeMode != null) { + sink = csvRows.writeAsText(path, writeMode); + } else { + sink = csvRows.writeAsText(path); + } + + if (numFiles > 0) { + csvRows.setParallelism(numFiles); + sink.setParallelism(numFiles); + } + + sink.name(TableConnectorUtils.generateRuntimeName(CsvTableSink.class, fieldNames)); + } + + @Override + @SuppressWarnings("") + public void emitDataStream(DataStream dataStream) { + SingleOutputStreamOperator csvRows = + dataStream.map(new CsvFormatter(fieldDelim == null ? "," : fieldDelim)); + + DataStreamSink sink; + if (writeMode != null) { + sink = csvRows.writeAsText(path, writeMode); + } else { + sink = csvRows.writeAsText(path); + } + + if (numFiles > 0) { +
[GitHub] [flink] wuchong commented on a change in pull request #8872: [FLINK-12977][table]Port CsvTableSource to api-java-bridge
wuchong commented on a change in pull request #8872: [FLINK-12977][table]Port CsvTableSource to api-java-bridge URL: https://github.com/apache/flink/pull/8872#discussion_r299291809 ## File path: flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/TableSourceTest.scala ## @@ -354,34 +354,6 @@ class TableSourceTest extends TableTestBase { // csv builder - @Test - def testCsvTableSourceBuilder(): Unit = { Review comment: I think we should still keep this test to verify builder behavior. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] wuchong commented on a change in pull request #8872: [FLINK-12977][table]Port CsvTableSource to api-java-bridge
wuchong commented on a change in pull request #8872: [FLINK-12977][table]Port CsvTableSource to api-java-bridge URL: https://github.com/apache/flink/pull/8872#discussion_r299288495 ## File path: flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sources/CsvTableSource.java ## @@ -0,0 +1,524 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.sources; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.io.InputFormat; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.io.CsvInputFormat; +import org.apache.flink.api.java.io.RowCsvInputFormat; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.core.fs.FileInputSplit; +import org.apache.flink.core.fs.Path; +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.table.functions.AsyncTableFunction; +import org.apache.flink.table.functions.FunctionContext; +import org.apache.flink.table.functions.TableFunction; +import org.apache.flink.types.Row; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.IntStream; + +/** + * A [[InputFormatTableSource]] and [[LookupableTableSource]] for simple CSV files with a + * (logically) unlimited number of fields. + */ +public class CsvTableSource extends InputFormatTableSource implements + LookupableTableSource, ProjectableTableSource, BatchTableSource { + + private final CsvInputFormatConfig config; + + + /** +* A [[BatchTableSource]] and [[StreamTableSource]] for simple CSV files with a +* (logically) unlimited number of fields. +* +* @param path The path to the CSV file. +* @param fieldNames The names of the table fields. +* @param fieldTypes The types of the table fields. +*/ + public CsvTableSource(String path, String[] fieldNames, TypeInformation[] fieldTypes) { + this(path, fieldNames, fieldTypes, + IntStream.range(0, fieldNames.length).toArray(), + CsvInputFormat.DEFAULT_FIELD_DELIMITER, CsvInputFormat.DEFAULT_LINE_DELIMITER, + null, false, null, false); + } + + /** +* A [[BatchTableSource]] and [[StreamTableSource]] for simple CSV files with a +* (logically) unlimited number of fields. +* +* @param path The path to the CSV file. +* @param fieldNames The names of the table fields. +* @param fieldTypes The types of the table fields. +* @param fieldDelim The field delimiter, "," by default. +* @param lineDelim The row delimiter, "\n" by default. +* @param quoteCharacter An optional quote character for String values, null by default. +* @param ignoreFirstLine Flag to ignore the first line, false by default. +* @param ignoreComments An optional prefix to indicate comments, null by default. +* @param lenient Flag to skip records with parse error instead to fail, false by default. +*/ + public CsvTableSource( + String path, + String[] fieldNames, + TypeInformation[] fieldTypes, + String fieldDelim, + String lineDelim, + Character quoteCharacter, + boolean ignoreFirstLine, + String ignoreComments, + boolean lenient) { + + this(path, fieldNames, fieldTypes, + IntStream.range(0, fieldNames.length).toArray(), + fieldDelim, lineDelim, + quoteCharacter, ignoreFirstLine, ignoreComments, lenient); + } + + /** +* A [[BatchTableSource]] and [[StreamTableSource]] for simple CSV files with a +* (logically) unlimited number of
[GitHub] [flink] wuchong commented on a change in pull request #8872: [FLINK-12977][table]Port CsvTableSource to api-java-bridge
wuchong commented on a change in pull request #8872: [FLINK-12977][table]Port CsvTableSource to api-java-bridge URL: https://github.com/apache/flink/pull/8872#discussion_r299039477 ## File path: flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sinks/CsvTableSink.java ## @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.sinks; + +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.operators.DataSink; +import org.apache.flink.api.java.operators.MapOperator; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.DataStreamSink; +import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; +import org.apache.flink.table.utils.TableConnectorUtils; +import org.apache.flink.types.Row; + +/** + * A simple [[TableSink]] to emit data as CSV files. + */ +public class CsvTableSink implements BatchTableSink, AppendStreamTableSink { + private String path; + private String fieldDelim; + private int numFiles = -1; + private FileSystem.WriteMode writeMode; + + private String[] fieldNames; + private TypeInformation[] fieldTypes; + + /** +* A simple [[TableSink]] to emit data as CSV files. +* +* @param path The output path to write the Table to. +* @param fieldDelim The field delimiter +* @param numFiles The number of files to write to +* @param writeMode The write mode to specify whether existing files are overwritten or not. +*/ + public CsvTableSink( + String path, + String fieldDelim, + int numFiles, + FileSystem.WriteMode writeMode) { + this.path = path; + this.fieldDelim = fieldDelim; + this.numFiles = numFiles; + this.writeMode = writeMode; + } + + /** +* A simple [[TableSink]] to emit data as CSV files using comma as field delimiter, with default +* parallelism and write mode. +* +* @param path The output path to write the Table to. +*/ + public CsvTableSink(String path) { + this(path, ","); + } + + /** +* A simple [[TableSink]] to emit data as CSV files, with default parallelism and write mode. +* +* @param path The output path to write the Table to. +* @param fieldDelim The field delimiter +*/ + public CsvTableSink(String path, String fieldDelim) { + this(path, fieldDelim, -1, null); + } + + @Override + public void emitDataSet(DataSet dataSet) { + MapOperator csvRows = + dataSet.map(new CsvFormatter(fieldDelim == null ? "," : fieldDelim)); + + DataSink sink; + if (writeMode != null) { + sink = csvRows.writeAsText(path, writeMode); + } else { + sink = csvRows.writeAsText(path); + } + + if (numFiles > 0) { + csvRows.setParallelism(numFiles); + sink.setParallelism(numFiles); + } + + sink.name(TableConnectorUtils.generateRuntimeName(CsvTableSink.class, fieldNames)); + } + + @Override + @SuppressWarnings("") Review comment: Unnecessary? Remove this? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] wuchong commented on a change in pull request #8872: [FLINK-12977][table]Port CsvTableSource to api-java-bridge
wuchong commented on a change in pull request #8872: [FLINK-12977][table]Port CsvTableSource to api-java-bridge URL: https://github.com/apache/flink/pull/8872#discussion_r299042550 ## File path: flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sinks/CsvTableSink.java ## @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.sinks; + +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.operators.DataSink; +import org.apache.flink.api.java.operators.MapOperator; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.DataStreamSink; +import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; +import org.apache.flink.table.utils.TableConnectorUtils; +import org.apache.flink.types.Row; + +/** + * A simple [[TableSink]] to emit data as CSV files. + */ +public class CsvTableSink implements BatchTableSink, AppendStreamTableSink { + private String path; + private String fieldDelim; + private int numFiles = -1; + private FileSystem.WriteMode writeMode; + + private String[] fieldNames; + private TypeInformation[] fieldTypes; + + /** +* A simple [[TableSink]] to emit data as CSV files. +* +* @param path The output path to write the Table to. +* @param fieldDelim The field delimiter +* @param numFiles The number of files to write to +* @param writeMode The write mode to specify whether existing files are overwritten or not. +*/ + public CsvTableSink( + String path, + String fieldDelim, + int numFiles, + FileSystem.WriteMode writeMode) { + this.path = path; + this.fieldDelim = fieldDelim; + this.numFiles = numFiles; + this.writeMode = writeMode; + } + + /** +* A simple [[TableSink]] to emit data as CSV files using comma as field delimiter, with default +* parallelism and write mode. +* +* @param path The output path to write the Table to. +*/ + public CsvTableSink(String path) { + this(path, ","); + } + + /** +* A simple [[TableSink]] to emit data as CSV files, with default parallelism and write mode. +* +* @param path The output path to write the Table to. +* @param fieldDelim The field delimiter +*/ + public CsvTableSink(String path, String fieldDelim) { + this(path, fieldDelim, -1, null); + } + + @Override + public void emitDataSet(DataSet dataSet) { + MapOperator csvRows = + dataSet.map(new CsvFormatter(fieldDelim == null ? "," : fieldDelim)); + + DataSink sink; + if (writeMode != null) { + sink = csvRows.writeAsText(path, writeMode); + } else { + sink = csvRows.writeAsText(path); + } + + if (numFiles > 0) { + csvRows.setParallelism(numFiles); + sink.setParallelism(numFiles); + } + + sink.name(TableConnectorUtils.generateRuntimeName(CsvTableSink.class, fieldNames)); + } + + @Override + @SuppressWarnings("") + public void emitDataStream(DataStream dataStream) { + SingleOutputStreamOperator csvRows = + dataStream.map(new CsvFormatter(fieldDelim == null ? "," : fieldDelim)); + + DataStreamSink sink; + if (writeMode != null) { + sink = csvRows.writeAsText(path, writeMode); + } else { + sink = csvRows.writeAsText(path); + } + + if (numFiles > 0) { +
[GitHub] [flink] wuchong commented on a change in pull request #8872: [FLINK-12977][table]Port CsvTableSource to api-java-bridge
wuchong commented on a change in pull request #8872: [FLINK-12977][table]Port CsvTableSource to api-java-bridge URL: https://github.com/apache/flink/pull/8872#discussion_r299291487 ## File path: flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sources/CsvTableSource.java ## @@ -0,0 +1,524 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.sources; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.io.InputFormat; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.io.CsvInputFormat; +import org.apache.flink.api.java.io.RowCsvInputFormat; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.core.fs.FileInputSplit; +import org.apache.flink.core.fs.Path; +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.table.functions.AsyncTableFunction; +import org.apache.flink.table.functions.FunctionContext; +import org.apache.flink.table.functions.TableFunction; +import org.apache.flink.types.Row; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.IntStream; + +/** + * A [[InputFormatTableSource]] and [[LookupableTableSource]] for simple CSV files with a + * (logically) unlimited number of fields. + */ +public class CsvTableSource extends InputFormatTableSource implements + LookupableTableSource, ProjectableTableSource, BatchTableSource { + + private final CsvInputFormatConfig config; + + + /** +* A [[BatchTableSource]] and [[StreamTableSource]] for simple CSV files with a +* (logically) unlimited number of fields. +* +* @param path The path to the CSV file. +* @param fieldNames The names of the table fields. +* @param fieldTypes The types of the table fields. +*/ + public CsvTableSource(String path, String[] fieldNames, TypeInformation[] fieldTypes) { + this(path, fieldNames, fieldTypes, + IntStream.range(0, fieldNames.length).toArray(), + CsvInputFormat.DEFAULT_FIELD_DELIMITER, CsvInputFormat.DEFAULT_LINE_DELIMITER, + null, false, null, false); + } + + /** +* A [[BatchTableSource]] and [[StreamTableSource]] for simple CSV files with a +* (logically) unlimited number of fields. +* +* @param path The path to the CSV file. +* @param fieldNames The names of the table fields. +* @param fieldTypes The types of the table fields. +* @param fieldDelim The field delimiter, "," by default. +* @param lineDelim The row delimiter, "\n" by default. +* @param quoteCharacter An optional quote character for String values, null by default. +* @param ignoreFirstLine Flag to ignore the first line, false by default. +* @param ignoreComments An optional prefix to indicate comments, null by default. +* @param lenient Flag to skip records with parse error instead to fail, false by default. +*/ + public CsvTableSource( + String path, + String[] fieldNames, + TypeInformation[] fieldTypes, + String fieldDelim, + String lineDelim, + Character quoteCharacter, + boolean ignoreFirstLine, + String ignoreComments, + boolean lenient) { + + this(path, fieldNames, fieldTypes, + IntStream.range(0, fieldNames.length).toArray(), + fieldDelim, lineDelim, + quoteCharacter, ignoreFirstLine, ignoreComments, lenient); + } + + /** +* A [[BatchTableSource]] and [[StreamTableSource]] for simple CSV files with a +* (logically) unlimited number of
[GitHub] [flink] wuchong commented on a change in pull request #8872: [FLINK-12977][table]Port CsvTableSource to api-java-bridge
wuchong commented on a change in pull request #8872: [FLINK-12977][table]Port CsvTableSource to api-java-bridge URL: https://github.com/apache/flink/pull/8872#discussion_r299288657 ## File path: flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sources/CsvTableSource.java ## @@ -0,0 +1,524 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.sources; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.io.InputFormat; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.io.CsvInputFormat; +import org.apache.flink.api.java.io.RowCsvInputFormat; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.core.fs.FileInputSplit; +import org.apache.flink.core.fs.Path; +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.table.functions.AsyncTableFunction; +import org.apache.flink.table.functions.FunctionContext; +import org.apache.flink.table.functions.TableFunction; +import org.apache.flink.types.Row; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.IntStream; + +/** + * A [[InputFormatTableSource]] and [[LookupableTableSource]] for simple CSV files with a + * (logically) unlimited number of fields. + */ +public class CsvTableSource extends InputFormatTableSource implements + LookupableTableSource, ProjectableTableSource, BatchTableSource { + + private final CsvInputFormatConfig config; + + + /** +* A [[BatchTableSource]] and [[StreamTableSource]] for simple CSV files with a +* (logically) unlimited number of fields. +* +* @param path The path to the CSV file. +* @param fieldNames The names of the table fields. +* @param fieldTypes The types of the table fields. +*/ + public CsvTableSource(String path, String[] fieldNames, TypeInformation[] fieldTypes) { + this(path, fieldNames, fieldTypes, + IntStream.range(0, fieldNames.length).toArray(), + CsvInputFormat.DEFAULT_FIELD_DELIMITER, CsvInputFormat.DEFAULT_LINE_DELIMITER, + null, false, null, false); + } + + /** +* A [[BatchTableSource]] and [[StreamTableSource]] for simple CSV files with a +* (logically) unlimited number of fields. +* +* @param path The path to the CSV file. +* @param fieldNames The names of the table fields. +* @param fieldTypes The types of the table fields. +* @param fieldDelim The field delimiter, "," by default. +* @param lineDelim The row delimiter, "\n" by default. +* @param quoteCharacter An optional quote character for String values, null by default. +* @param ignoreFirstLine Flag to ignore the first line, false by default. +* @param ignoreComments An optional prefix to indicate comments, null by default. +* @param lenient Flag to skip records with parse error instead to fail, false by default. +*/ + public CsvTableSource( + String path, + String[] fieldNames, + TypeInformation[] fieldTypes, + String fieldDelim, + String lineDelim, + Character quoteCharacter, + boolean ignoreFirstLine, + String ignoreComments, + boolean lenient) { + + this(path, fieldNames, fieldTypes, + IntStream.range(0, fieldNames.length).toArray(), + fieldDelim, lineDelim, + quoteCharacter, ignoreFirstLine, ignoreComments, lenient); + } + + /** +* A [[BatchTableSource]] and [[StreamTableSource]] for simple CSV files with a +* (logically) unlimited number of
[GitHub] [flink] wuchong commented on a change in pull request #8872: [FLINK-12977][table]Port CsvTableSource to api-java-bridge
wuchong commented on a change in pull request #8872: [FLINK-12977][table]Port CsvTableSource to api-java-bridge URL: https://github.com/apache/flink/pull/8872#discussion_r299042199 ## File path: flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sinks/CsvTableSink.java ## @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.sinks; + +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.operators.DataSink; +import org.apache.flink.api.java.operators.MapOperator; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.DataStreamSink; +import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; +import org.apache.flink.table.utils.TableConnectorUtils; +import org.apache.flink.types.Row; + +/** + * A simple [[TableSink]] to emit data as CSV files. + */ +public class CsvTableSink implements BatchTableSink, AppendStreamTableSink { + private String path; + private String fieldDelim; + private int numFiles = -1; + private FileSystem.WriteMode writeMode; + + private String[] fieldNames; + private TypeInformation[] fieldTypes; + + /** +* A simple [[TableSink]] to emit data as CSV files. +* +* @param path The output path to write the Table to. +* @param fieldDelim The field delimiter +* @param numFiles The number of files to write to +* @param writeMode The write mode to specify whether existing files are overwritten or not. +*/ + public CsvTableSink( + String path, + String fieldDelim, + int numFiles, + FileSystem.WriteMode writeMode) { + this.path = path; + this.fieldDelim = fieldDelim; + this.numFiles = numFiles; + this.writeMode = writeMode; + } + + /** +* A simple [[TableSink]] to emit data as CSV files using comma as field delimiter, with default +* parallelism and write mode. +* +* @param path The output path to write the Table to. +*/ + public CsvTableSink(String path) { + this(path, ","); + } + + /** +* A simple [[TableSink]] to emit data as CSV files, with default parallelism and write mode. +* +* @param path The output path to write the Table to. +* @param fieldDelim The field delimiter +*/ + public CsvTableSink(String path, String fieldDelim) { + this(path, fieldDelim, -1, null); + } + + @Override + public void emitDataSet(DataSet dataSet) { + MapOperator csvRows = + dataSet.map(new CsvFormatter(fieldDelim == null ? "," : fieldDelim)); + + DataSink sink; + if (writeMode != null) { + sink = csvRows.writeAsText(path, writeMode); + } else { + sink = csvRows.writeAsText(path); + } + + if (numFiles > 0) { + csvRows.setParallelism(numFiles); + sink.setParallelism(numFiles); + } + + sink.name(TableConnectorUtils.generateRuntimeName(CsvTableSink.class, fieldNames)); + } + + @Override + @SuppressWarnings("") + public void emitDataStream(DataStream dataStream) { + SingleOutputStreamOperator csvRows = + dataStream.map(new CsvFormatter(fieldDelim == null ? "," : fieldDelim)); + + DataStreamSink sink; + if (writeMode != null) { + sink = csvRows.writeAsText(path, writeMode); + } else { + sink = csvRows.writeAsText(path); + } + + if (numFiles > 0) { +
[GitHub] [flink] wuchong commented on a change in pull request #8872: [FLINK-12977][table]Port CsvTableSource to api-java-bridge
wuchong commented on a change in pull request #8872: [FLINK-12977][table]Port CsvTableSource to api-java-bridge URL: https://github.com/apache/flink/pull/8872#discussion_r298953595 ## File path: flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sinks/CsvTableSink.java ## @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.sinks; + +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.operators.DataSink; +import org.apache.flink.api.java.operators.MapOperator; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.DataStreamSink; +import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; +import org.apache.flink.table.utils.TableConnectorUtils; +import org.apache.flink.types.Row; + +/** + * A simple [[TableSink]] to emit data as CSV files. + */ +public class CsvTableSink implements BatchTableSink, AppendStreamTableSink { + private String path; + private String fieldDelim; + private int numFiles = -1; + private FileSystem.WriteMode writeMode; + + private String[] fieldNames; + private TypeInformation[] fieldTypes; + + /** +* A simple [[TableSink]] to emit data as CSV files. Review comment: Replace `[[TableSink]]` by `{@link TableSink}` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] wuchong commented on a change in pull request #8872: [FLINK-12977][table]Port CsvTableSource to api-java-bridge
wuchong commented on a change in pull request #8872: [FLINK-12977][table]Port CsvTableSource to api-java-bridge URL: https://github.com/apache/flink/pull/8872#discussion_r299287964 ## File path: flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sources/CsvTableSource.java ## @@ -0,0 +1,524 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.sources; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.io.InputFormat; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.io.CsvInputFormat; +import org.apache.flink.api.java.io.RowCsvInputFormat; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.core.fs.FileInputSplit; +import org.apache.flink.core.fs.Path; +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.table.functions.AsyncTableFunction; +import org.apache.flink.table.functions.FunctionContext; +import org.apache.flink.table.functions.TableFunction; +import org.apache.flink.types.Row; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.IntStream; + +/** + * A [[InputFormatTableSource]] and [[LookupableTableSource]] for simple CSV files with a + * (logically) unlimited number of fields. + */ +public class CsvTableSource extends InputFormatTableSource implements + LookupableTableSource, ProjectableTableSource, BatchTableSource { + + private final CsvInputFormatConfig config; + + + /** +* A [[BatchTableSource]] and [[StreamTableSource]] for simple CSV files with a +* (logically) unlimited number of fields. +* +* @param path The path to the CSV file. +* @param fieldNames The names of the table fields. +* @param fieldTypes The types of the table fields. +*/ + public CsvTableSource(String path, String[] fieldNames, TypeInformation[] fieldTypes) { + this(path, fieldNames, fieldTypes, + IntStream.range(0, fieldNames.length).toArray(), + CsvInputFormat.DEFAULT_FIELD_DELIMITER, CsvInputFormat.DEFAULT_LINE_DELIMITER, + null, false, null, false); + } + + /** +* A [[BatchTableSource]] and [[StreamTableSource]] for simple CSV files with a +* (logically) unlimited number of fields. +* +* @param path The path to the CSV file. +* @param fieldNames The names of the table fields. +* @param fieldTypes The types of the table fields. +* @param fieldDelim The field delimiter, "," by default. +* @param lineDelim The row delimiter, "\n" by default. +* @param quoteCharacter An optional quote character for String values, null by default. +* @param ignoreFirstLine Flag to ignore the first line, false by default. +* @param ignoreComments An optional prefix to indicate comments, null by default. +* @param lenient Flag to skip records with parse error instead to fail, false by default. +*/ + public CsvTableSource( + String path, + String[] fieldNames, + TypeInformation[] fieldTypes, + String fieldDelim, + String lineDelim, + Character quoteCharacter, + boolean ignoreFirstLine, + String ignoreComments, + boolean lenient) { + + this(path, fieldNames, fieldTypes, + IntStream.range(0, fieldNames.length).toArray(), + fieldDelim, lineDelim, + quoteCharacter, ignoreFirstLine, ignoreComments, lenient); + } + + /** +* A [[BatchTableSource]] and [[StreamTableSource]] for simple CSV files with a +* (logically) unlimited number of
[GitHub] [flink] wuchong commented on a change in pull request #8872: [FLINK-12977][table]Port CsvTableSource to api-java-bridge
wuchong commented on a change in pull request #8872: [FLINK-12977][table]Port CsvTableSource to api-java-bridge URL: https://github.com/apache/flink/pull/8872#discussion_r299282489 ## File path: flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sinks/CsvTableSink.java ## @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.sinks; + +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.operators.DataSink; +import org.apache.flink.api.java.operators.MapOperator; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.DataStreamSink; +import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; +import org.apache.flink.table.utils.TableConnectorUtils; +import org.apache.flink.types.Row; + +/** + * A simple [[TableSink]] to emit data as CSV files. + */ +public class CsvTableSink implements BatchTableSink, AppendStreamTableSink { + private String path; + private String fieldDelim; + private int numFiles = -1; + private FileSystem.WriteMode writeMode; + + private String[] fieldNames; + private TypeInformation[] fieldTypes; + + /** +* A simple [[TableSink]] to emit data as CSV files. +* +* @param path The output path to write the Table to. +* @param fieldDelim The field delimiter +* @param numFiles The number of files to write to +* @param writeMode The write mode to specify whether existing files are overwritten or not. +*/ + public CsvTableSink( + String path, + String fieldDelim, + int numFiles, + FileSystem.WriteMode writeMode) { + this.path = path; + this.fieldDelim = fieldDelim; + this.numFiles = numFiles; + this.writeMode = writeMode; + } + + /** +* A simple [[TableSink]] to emit data as CSV files using comma as field delimiter, with default +* parallelism and write mode. +* +* @param path The output path to write the Table to. +*/ + public CsvTableSink(String path) { + this(path, ","); + } + + /** +* A simple [[TableSink]] to emit data as CSV files, with default parallelism and write mode. +* +* @param path The output path to write the Table to. +* @param fieldDelim The field delimiter +*/ + public CsvTableSink(String path, String fieldDelim) { + this(path, fieldDelim, -1, null); + } + + @Override + public void emitDataSet(DataSet dataSet) { + MapOperator csvRows = + dataSet.map(new CsvFormatter(fieldDelim == null ? "," : fieldDelim)); + + DataSink sink; + if (writeMode != null) { + sink = csvRows.writeAsText(path, writeMode); + } else { + sink = csvRows.writeAsText(path); + } + + if (numFiles > 0) { + csvRows.setParallelism(numFiles); + sink.setParallelism(numFiles); + } + + sink.name(TableConnectorUtils.generateRuntimeName(CsvTableSink.class, fieldNames)); + } + + @Override + @SuppressWarnings("") + public void emitDataStream(DataStream dataStream) { Review comment: Please override `DataStreamSink consumeDataStream(DataStream dataStream)` method, it's mandatory to return a `DataStreamSink` if used in blink-planner. The implementation of `emitDataStream` can simply to call `consumeDataStream`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL abo
[GitHub] [flink] wuchong commented on a change in pull request #8872: [FLINK-12977][table]Port CsvTableSource to api-java-bridge
wuchong commented on a change in pull request #8872: [FLINK-12977][table]Port CsvTableSource to api-java-bridge URL: https://github.com/apache/flink/pull/8872#discussion_r299290566 ## File path: flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sources/CsvTableSource.java ## @@ -0,0 +1,524 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.sources; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.io.InputFormat; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.io.CsvInputFormat; +import org.apache.flink.api.java.io.RowCsvInputFormat; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.core.fs.FileInputSplit; +import org.apache.flink.core.fs.Path; +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.table.functions.AsyncTableFunction; +import org.apache.flink.table.functions.FunctionContext; +import org.apache.flink.table.functions.TableFunction; +import org.apache.flink.types.Row; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.IntStream; + +/** + * A [[InputFormatTableSource]] and [[LookupableTableSource]] for simple CSV files with a + * (logically) unlimited number of fields. + */ +public class CsvTableSource extends InputFormatTableSource implements + LookupableTableSource, ProjectableTableSource, BatchTableSource { + + private final CsvInputFormatConfig config; + + + /** +* A [[BatchTableSource]] and [[StreamTableSource]] for simple CSV files with a +* (logically) unlimited number of fields. +* +* @param path The path to the CSV file. +* @param fieldNames The names of the table fields. +* @param fieldTypes The types of the table fields. +*/ + public CsvTableSource(String path, String[] fieldNames, TypeInformation[] fieldTypes) { + this(path, fieldNames, fieldTypes, + IntStream.range(0, fieldNames.length).toArray(), + CsvInputFormat.DEFAULT_FIELD_DELIMITER, CsvInputFormat.DEFAULT_LINE_DELIMITER, + null, false, null, false); + } + + /** +* A [[BatchTableSource]] and [[StreamTableSource]] for simple CSV files with a +* (logically) unlimited number of fields. +* +* @param path The path to the CSV file. +* @param fieldNames The names of the table fields. +* @param fieldTypes The types of the table fields. +* @param fieldDelim The field delimiter, "," by default. +* @param lineDelim The row delimiter, "\n" by default. +* @param quoteCharacter An optional quote character for String values, null by default. +* @param ignoreFirstLine Flag to ignore the first line, false by default. +* @param ignoreComments An optional prefix to indicate comments, null by default. +* @param lenient Flag to skip records with parse error instead to fail, false by default. +*/ + public CsvTableSource( + String path, + String[] fieldNames, + TypeInformation[] fieldTypes, + String fieldDelim, + String lineDelim, + Character quoteCharacter, + boolean ignoreFirstLine, + String ignoreComments, + boolean lenient) { + + this(path, fieldNames, fieldTypes, + IntStream.range(0, fieldNames.length).toArray(), + fieldDelim, lineDelim, + quoteCharacter, ignoreFirstLine, ignoreComments, lenient); + } + + /** +* A [[BatchTableSource]] and [[StreamTableSource]] for simple CSV files with a +* (logically) unlimited number of
[GitHub] [flink] wuchong commented on a change in pull request #8872: [FLINK-12977][table]Port CsvTableSource to api-java-bridge
wuchong commented on a change in pull request #8872: [FLINK-12977][table]Port CsvTableSource to api-java-bridge URL: https://github.com/apache/flink/pull/8872#discussion_r299288200 ## File path: flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sources/CsvTableSource.java ## @@ -0,0 +1,524 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.sources; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.io.InputFormat; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.io.CsvInputFormat; +import org.apache.flink.api.java.io.RowCsvInputFormat; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.core.fs.FileInputSplit; +import org.apache.flink.core.fs.Path; +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.table.functions.AsyncTableFunction; +import org.apache.flink.table.functions.FunctionContext; +import org.apache.flink.table.functions.TableFunction; +import org.apache.flink.types.Row; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.IntStream; + +/** + * A [[InputFormatTableSource]] and [[LookupableTableSource]] for simple CSV files with a + * (logically) unlimited number of fields. + */ +public class CsvTableSource extends InputFormatTableSource implements Review comment: missing `equals(..)` and `hashCode()`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] wuchong commented on a change in pull request #8872: [FLINK-12977][table]Port CsvTableSource to api-java-bridge
wuchong commented on a change in pull request #8872: [FLINK-12977][table]Port CsvTableSource to api-java-bridge URL: https://github.com/apache/flink/pull/8872#discussion_r299296911 ## File path: flink-python/pyflink/table/sinks.py ## @@ -49,24 +49,20 @@ class CsvTableSink(TableSink): :param write_mode: The write mode to specify whether existing files are overwritten or not. """ -def __init__(self, field_names, field_types, path, field_delimiter=',', num_files=1, +def __init__(self, field_names, field_types, path, field_delimiter=',', num_files=-1, Review comment: @dianfu could you have a look at the pyflink changes? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] ifndef-SleePy commented on a change in pull request #8931: [FLINK-13041] Make ScheduleMode configurable via ExecutionConfig
ifndef-SleePy commented on a change in pull request #8931: [FLINK-13041] Make ScheduleMode configurable via ExecutionConfig URL: https://github.com/apache/flink/pull/8931#discussion_r299295760 ## File path: flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java ## @@ -90,6 +90,8 @@ /** Defines how data exchange happens - batch or pipelined */ private ExecutionMode executionMode = ExecutionMode.PIPELINED; + private ScheduleMode scheduleMode = ScheduleMode.LAZY_FROM_SOURCES; Review comment: Good point! It changes default behavior of `DataStream` job, `EAGER` -> `LAZY_FROM_SOURCE`. Should we move `ScheduleMode` from `ExecutionConfig` to `StreamExecutionEnvironment` since `ExecutionConfig` is also used by `DataSet`? `DataSet` needs `LAZY_FROM_SOURCE`, `DataStream` needs `EAGER` by default. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] ifndef-SleePy commented on a change in pull request #8931: [FLINK-13041] Make ScheduleMode configurable via ExecutionConfig
ifndef-SleePy commented on a change in pull request #8931: [FLINK-13041] Make ScheduleMode configurable via ExecutionConfig URL: https://github.com/apache/flink/pull/8931#discussion_r299295760 ## File path: flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java ## @@ -90,6 +90,8 @@ /** Defines how data exchange happens - batch or pipelined */ private ExecutionMode executionMode = ExecutionMode.PIPELINED; + private ScheduleMode scheduleMode = ScheduleMode.LAZY_FROM_SOURCES; Review comment: Good point! It changes default behavior of `DataStream` job, `EAGER` -> `LAZY_FROM_SOURCE`. Should we move `ScheduleMode` from `ExecutionConfig` to `StreamExecutionEnvironment` since `ExecutionConfig` is also used by `DataSet`? `DataSet` need `LAZY_FROM_SOURCE`, `DataStream` need `EAGER` by default. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhijiangW commented on issue #8455: [FLINK-12284][Network, Metrics]Fix the incorrect inputBufferUsage metric in credit-based network mode
zhijiangW commented on issue #8455: [FLINK-12284][Network,Metrics]Fix the incorrect inputBufferUsage metric in credit-based network mode URL: https://github.com/apache/flink/pull/8455#issuecomment-507508370 Thanks for the updates @Aitozi ! I left some other comments for tests. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhijiangW commented on a change in pull request #8455: [FLINK-12284][Network, Metrics]Fix the incorrect inputBufferUsage metric in credit-based network mode
zhijiangW commented on a change in pull request #8455: [FLINK-12284][Network,Metrics]Fix the incorrect inputBufferUsage metric in credit-based network mode URL: https://github.com/apache/flink/pull/8455#discussion_r299290395 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputBuffersMetricsTest.java ## @@ -0,0 +1,316 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition.consumer; + +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.core.fs.CloseableRegistry; +import org.apache.flink.runtime.io.network.NettyShuffleEnvironment; +import org.apache.flink.runtime.io.network.NettyShuffleEnvironmentBuilder; +import org.apache.flink.runtime.io.network.TestingConnectionManager; +import org.apache.flink.runtime.io.network.buffer.Buffer; +import org.apache.flink.runtime.io.network.metrics.CreditBasedInputBuffersUsageGauge; +import org.apache.flink.runtime.io.network.metrics.ExclusiveBuffersUsageGauge; +import org.apache.flink.runtime.io.network.metrics.FloatingBuffersUsageGauge; +import org.apache.flink.runtime.io.network.partition.ResultPartitionType; +import org.apache.flink.util.TestLogger; + +import org.junit.Test; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import static org.junit.Assert.assertEquals; + +/** + * Tests the metrics for input buffers usage. + */ +public class InputBuffersMetricsTest extends TestLogger { + + @Test + public void testCalculateTotalBuffersSize() throws IOException { + int numberOfRemoteChannels = 2; + int numberOfLocalChannels = 0; + + int numberOfBufferPerChannel = 2; + int numberOfBuffersPerGate = 8; + + NettyShuffleEnvironment network = new NettyShuffleEnvironmentBuilder() + .setNetworkBuffersPerChannel(numberOfBufferPerChannel) + .setFloatingNetworkBuffersPerGate(numberOfBuffersPerGate) + .build(); + + Tuple3, List> tuple1 = buildInputGate( + network, + numberOfRemoteChannels, + numberOfLocalChannels); + + SingleInputGate inputGate1 = tuple1.f0; + + SingleInputGate[] inputGates = new SingleInputGate[]{inputGate1}; + FloatingBuffersUsageGauge floatingBuffersUsageGauge = new FloatingBuffersUsageGauge(inputGates); + ExclusiveBuffersUsageGauge exclusiveBuffersUsageGauge = new ExclusiveBuffersUsageGauge(inputGates); + CreditBasedInputBuffersUsageGauge inputBufferPoolUsageGauge = new CreditBasedInputBuffersUsageGauge( + floatingBuffersUsageGauge, + exclusiveBuffersUsageGauge, + inputGates); + + try (CloseableRegistry closeableRegistry = new CloseableRegistry()) { + + closeableRegistry.registerCloseable(network::close); + closeableRegistry.registerCloseable(inputGate1::close); + + assertEquals(numberOfBuffersPerGate, floatingBuffersUsageGauge.calculateTotalBuffers(inputGate1)); + assertEquals(numberOfRemoteChannels * numberOfBufferPerChannel, exclusiveBuffersUsageGauge.calculateTotalBuffers(inputGate1)); + assertEquals(numberOfRemoteChannels * numberOfBufferPerChannel + numberOfBuffersPerGate, inputBufferPoolUsageGauge.calculateTotalBuffers(inputGate1)); + } + } + + @Test + public void testExclusiveBuffersUsage() throws IOException { + int numberOfRemoteChannelsGate1 = 2; + int numberOfLocalChannelsGate1 = 0; + int numberOfRemoteChannelsGate2 = 1; + int numberOfLocalChannelsGate2 = 1; + + int numberOfRemoteChannelsTotal = numberOfRemoteChannelsGate1 + numberOfRemoteChannelsGate2; + int numberOfInputGates = 2; + + int buffersPerChannel = 2; + int extraNetworkBuffersPerGate = 8; + + Closea
[GitHub] [flink] zhijiangW commented on a change in pull request #8455: [FLINK-12284][Network, Metrics]Fix the incorrect inputBufferUsage metric in credit-based network mode
zhijiangW commented on a change in pull request #8455: [FLINK-12284][Network,Metrics]Fix the incorrect inputBufferUsage metric in credit-based network mode URL: https://github.com/apache/flink/pull/8455#discussion_r299293975 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputBuffersMetricsTest.java ## @@ -0,0 +1,316 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition.consumer; + +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.core.fs.CloseableRegistry; +import org.apache.flink.runtime.io.network.NettyShuffleEnvironment; +import org.apache.flink.runtime.io.network.NettyShuffleEnvironmentBuilder; +import org.apache.flink.runtime.io.network.TestingConnectionManager; +import org.apache.flink.runtime.io.network.buffer.Buffer; +import org.apache.flink.runtime.io.network.metrics.CreditBasedInputBuffersUsageGauge; +import org.apache.flink.runtime.io.network.metrics.ExclusiveBuffersUsageGauge; +import org.apache.flink.runtime.io.network.metrics.FloatingBuffersUsageGauge; +import org.apache.flink.runtime.io.network.partition.ResultPartitionType; +import org.apache.flink.util.TestLogger; + +import org.junit.Test; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import static org.junit.Assert.assertEquals; + +/** + * Tests the metrics for input buffers usage. + */ +public class InputBuffersMetricsTest extends TestLogger { + + @Test + public void testCalculateTotalBuffersSize() throws IOException { + int numberOfRemoteChannels = 2; + int numberOfLocalChannels = 0; + + int numberOfBufferPerChannel = 2; + int numberOfBuffersPerGate = 8; + + NettyShuffleEnvironment network = new NettyShuffleEnvironmentBuilder() + .setNetworkBuffersPerChannel(numberOfBufferPerChannel) + .setFloatingNetworkBuffersPerGate(numberOfBuffersPerGate) + .build(); + + Tuple3, List> tuple1 = buildInputGate( + network, + numberOfRemoteChannels, + numberOfLocalChannels); + + SingleInputGate inputGate1 = tuple1.f0; + + SingleInputGate[] inputGates = new SingleInputGate[]{inputGate1}; + FloatingBuffersUsageGauge floatingBuffersUsageGauge = new FloatingBuffersUsageGauge(inputGates); + ExclusiveBuffersUsageGauge exclusiveBuffersUsageGauge = new ExclusiveBuffersUsageGauge(inputGates); + CreditBasedInputBuffersUsageGauge inputBufferPoolUsageGauge = new CreditBasedInputBuffersUsageGauge( + floatingBuffersUsageGauge, + exclusiveBuffersUsageGauge, + inputGates); + + try (CloseableRegistry closeableRegistry = new CloseableRegistry()) { + + closeableRegistry.registerCloseable(network::close); + closeableRegistry.registerCloseable(inputGate1::close); + + assertEquals(numberOfBuffersPerGate, floatingBuffersUsageGauge.calculateTotalBuffers(inputGate1)); + assertEquals(numberOfRemoteChannels * numberOfBufferPerChannel, exclusiveBuffersUsageGauge.calculateTotalBuffers(inputGate1)); + assertEquals(numberOfRemoteChannels * numberOfBufferPerChannel + numberOfBuffersPerGate, inputBufferPoolUsageGauge.calculateTotalBuffers(inputGate1)); + } + } + + @Test + public void testExclusiveBuffersUsage() throws IOException { + int numberOfRemoteChannelsGate1 = 2; + int numberOfLocalChannelsGate1 = 0; + int numberOfRemoteChannelsGate2 = 1; + int numberOfLocalChannelsGate2 = 1; + + int numberOfRemoteChannelsTotal = numberOfRemoteChannelsGate1 + numberOfRemoteChannelsGate2; + int numberOfInputGates = 2; + + int buffersPerChannel = 2; + int extraNetworkBuffersPerGate = 8; + + Closea
[GitHub] [flink] zhijiangW commented on a change in pull request #8455: [FLINK-12284][Network, Metrics]Fix the incorrect inputBufferUsage metric in credit-based network mode
zhijiangW commented on a change in pull request #8455: [FLINK-12284][Network,Metrics]Fix the incorrect inputBufferUsage metric in credit-based network mode URL: https://github.com/apache/flink/pull/8455#discussion_r299293436 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputBuffersMetricsTest.java ## @@ -0,0 +1,316 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition.consumer; + +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.core.fs.CloseableRegistry; +import org.apache.flink.runtime.io.network.NettyShuffleEnvironment; +import org.apache.flink.runtime.io.network.NettyShuffleEnvironmentBuilder; +import org.apache.flink.runtime.io.network.TestingConnectionManager; +import org.apache.flink.runtime.io.network.buffer.Buffer; +import org.apache.flink.runtime.io.network.metrics.CreditBasedInputBuffersUsageGauge; +import org.apache.flink.runtime.io.network.metrics.ExclusiveBuffersUsageGauge; +import org.apache.flink.runtime.io.network.metrics.FloatingBuffersUsageGauge; +import org.apache.flink.runtime.io.network.partition.ResultPartitionType; +import org.apache.flink.util.TestLogger; + +import org.junit.Test; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import static org.junit.Assert.assertEquals; + +/** + * Tests the metrics for input buffers usage. + */ +public class InputBuffersMetricsTest extends TestLogger { + + @Test + public void testCalculateTotalBuffersSize() throws IOException { + int numberOfRemoteChannels = 2; + int numberOfLocalChannels = 0; + + int numberOfBufferPerChannel = 2; + int numberOfBuffersPerGate = 8; + + NettyShuffleEnvironment network = new NettyShuffleEnvironmentBuilder() + .setNetworkBuffersPerChannel(numberOfBufferPerChannel) + .setFloatingNetworkBuffersPerGate(numberOfBuffersPerGate) + .build(); + + Tuple3, List> tuple1 = buildInputGate( + network, + numberOfRemoteChannels, + numberOfLocalChannels); + + SingleInputGate inputGate1 = tuple1.f0; + + SingleInputGate[] inputGates = new SingleInputGate[]{inputGate1}; + FloatingBuffersUsageGauge floatingBuffersUsageGauge = new FloatingBuffersUsageGauge(inputGates); + ExclusiveBuffersUsageGauge exclusiveBuffersUsageGauge = new ExclusiveBuffersUsageGauge(inputGates); + CreditBasedInputBuffersUsageGauge inputBufferPoolUsageGauge = new CreditBasedInputBuffersUsageGauge( + floatingBuffersUsageGauge, + exclusiveBuffersUsageGauge, + inputGates); + + try (CloseableRegistry closeableRegistry = new CloseableRegistry()) { + + closeableRegistry.registerCloseable(network::close); + closeableRegistry.registerCloseable(inputGate1::close); + + assertEquals(numberOfBuffersPerGate, floatingBuffersUsageGauge.calculateTotalBuffers(inputGate1)); + assertEquals(numberOfRemoteChannels * numberOfBufferPerChannel, exclusiveBuffersUsageGauge.calculateTotalBuffers(inputGate1)); + assertEquals(numberOfRemoteChannels * numberOfBufferPerChannel + numberOfBuffersPerGate, inputBufferPoolUsageGauge.calculateTotalBuffers(inputGate1)); + } + } + + @Test + public void testExclusiveBuffersUsage() throws IOException { + int numberOfRemoteChannelsGate1 = 2; + int numberOfLocalChannelsGate1 = 0; + int numberOfRemoteChannelsGate2 = 1; + int numberOfLocalChannelsGate2 = 1; + + int numberOfRemoteChannelsTotal = numberOfRemoteChannelsGate1 + numberOfRemoteChannelsGate2; + int numberOfInputGates = 2; + + int buffersPerChannel = 2; + int extraNetworkBuffersPerGate = 8; + + Closea
[jira] [Assigned] (FLINK-13035) LocalStreamEnvironment shall launch actuall task solts
[ https://issues.apache.org/jira/browse/FLINK-13035?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Wong reassigned FLINK-13035: Assignee: Wong > LocalStreamEnvironment shall launch actuall task solts > --- > > Key: FLINK-13035 > URL: https://issues.apache.org/jira/browse/FLINK-13035 > Project: Flink > Issue Type: Wish > Components: Runtime / Task >Affects Versions: 1.9.0 >Reporter: Wong >Assignee: Wong >Priority: Trivial > > When developing flink jobs, there is some times use different soltgroup to > expand threads.But now minicluster use default > jobGraph.getMaximumParallelism(), sometimes is less than actual solts,so it > can‘’t lanch job if not set TaskManagerOptions.NUM_TASK_SLOTS . Is this > needed? > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] KurtYoung commented on a change in pull request #8832: [FLINK-12937] [table-planner-blink] Introduce join reorder planner rules in blink planner
KurtYoung commented on a change in pull request #8832: [FLINK-12937] [table-planner-blink] Introduce join reorder planner rules in blink planner URL: https://github.com/apache/flink/pull/8832#discussion_r299292745 ## File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/optimize/program/FlinkBatchProgram.scala ## @@ -135,6 +137,24 @@ object FlinkBatchProgram { .build(), "prune empty after predicate push down") .build()) +// join reorder +if (config.getBoolean(PlannerConfigOptions.SQL_OPTIMIZER_JOIN_REORDER_ENABLED)) { + chainedProgram.addLast( +JOIN_REORDER, +FlinkGroupProgramBuilder.newBuilder[BatchOptimizeContext] + .addProgram(FlinkHepRuleSetProgramBuilder.newBuilder +.setHepRulesExecutionType(HEP_RULES_EXECUTION_TYPE.RULE_COLLECTION) +.setHepMatchOrder(HepMatchOrder.BOTTOM_UP) +.add(FlinkBatchRuleSets.JOIN_REORDER_PERPARE_RULES) +.build(), "merge join into MultiJoin") + .addProgram(FlinkHepRuleSetProgramBuilder.newBuilder +.setHepRulesExecutionType(HEP_RULES_EXECUTION_TYPE.RULE_SEQUENCE) +.setHepMatchOrder(HepMatchOrder.BOTTOM_UP) +.add(FlinkBatchRuleSets.JOIN_REORDER_RULES) Review comment: join reorder rules are applied with hep planner? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhijiangW commented on a change in pull request #8455: [FLINK-12284][Network, Metrics]Fix the incorrect inputBufferUsage metric in credit-based network mode
zhijiangW commented on a change in pull request #8455: [FLINK-12284][Network,Metrics]Fix the incorrect inputBufferUsage metric in credit-based network mode URL: https://github.com/apache/flink/pull/8455#discussion_r299292724 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputBuffersMetricsTest.java ## @@ -0,0 +1,316 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition.consumer; + +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.core.fs.CloseableRegistry; +import org.apache.flink.runtime.io.network.NettyShuffleEnvironment; +import org.apache.flink.runtime.io.network.NettyShuffleEnvironmentBuilder; +import org.apache.flink.runtime.io.network.TestingConnectionManager; +import org.apache.flink.runtime.io.network.buffer.Buffer; +import org.apache.flink.runtime.io.network.metrics.CreditBasedInputBuffersUsageGauge; +import org.apache.flink.runtime.io.network.metrics.ExclusiveBuffersUsageGauge; +import org.apache.flink.runtime.io.network.metrics.FloatingBuffersUsageGauge; +import org.apache.flink.runtime.io.network.partition.ResultPartitionType; +import org.apache.flink.util.TestLogger; + +import org.junit.Test; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import static org.junit.Assert.assertEquals; + +/** + * Tests the metrics for input buffers usage. + */ +public class InputBuffersMetricsTest extends TestLogger { + + @Test + public void testCalculateTotalBuffersSize() throws IOException { + int numberOfRemoteChannels = 2; + int numberOfLocalChannels = 0; + + int numberOfBufferPerChannel = 2; + int numberOfBuffersPerGate = 8; + + NettyShuffleEnvironment network = new NettyShuffleEnvironmentBuilder() + .setNetworkBuffersPerChannel(numberOfBufferPerChannel) + .setFloatingNetworkBuffersPerGate(numberOfBuffersPerGate) + .build(); + + Tuple3, List> tuple1 = buildInputGate( + network, + numberOfRemoteChannels, + numberOfLocalChannels); + + SingleInputGate inputGate1 = tuple1.f0; + + SingleInputGate[] inputGates = new SingleInputGate[]{inputGate1}; + FloatingBuffersUsageGauge floatingBuffersUsageGauge = new FloatingBuffersUsageGauge(inputGates); + ExclusiveBuffersUsageGauge exclusiveBuffersUsageGauge = new ExclusiveBuffersUsageGauge(inputGates); + CreditBasedInputBuffersUsageGauge inputBufferPoolUsageGauge = new CreditBasedInputBuffersUsageGauge( + floatingBuffersUsageGauge, + exclusiveBuffersUsageGauge, + inputGates); + + try (CloseableRegistry closeableRegistry = new CloseableRegistry()) { + + closeableRegistry.registerCloseable(network::close); + closeableRegistry.registerCloseable(inputGate1::close); + + assertEquals(numberOfBuffersPerGate, floatingBuffersUsageGauge.calculateTotalBuffers(inputGate1)); + assertEquals(numberOfRemoteChannels * numberOfBufferPerChannel, exclusiveBuffersUsageGauge.calculateTotalBuffers(inputGate1)); + assertEquals(numberOfRemoteChannels * numberOfBufferPerChannel + numberOfBuffersPerGate, inputBufferPoolUsageGauge.calculateTotalBuffers(inputGate1)); + } + } + + @Test + public void testExclusiveBuffersUsage() throws IOException { + int numberOfRemoteChannelsGate1 = 2; + int numberOfLocalChannelsGate1 = 0; + int numberOfRemoteChannelsGate2 = 1; + int numberOfLocalChannelsGate2 = 1; + + int numberOfRemoteChannelsTotal = numberOfRemoteChannelsGate1 + numberOfRemoteChannelsGate2; + int numberOfInputGates = 2; + + int buffersPerChannel = 2; + int extraNetworkBuffersPerGate = 8; + + Closea
[GitHub] [flink] zhijiangW commented on a change in pull request #8455: [FLINK-12284][Network, Metrics]Fix the incorrect inputBufferUsage metric in credit-based network mode
zhijiangW commented on a change in pull request #8455: [FLINK-12284][Network,Metrics]Fix the incorrect inputBufferUsage metric in credit-based network mode URL: https://github.com/apache/flink/pull/8455#discussion_r299292571 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputBuffersMetricsTest.java ## @@ -0,0 +1,316 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition.consumer; + +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.core.fs.CloseableRegistry; +import org.apache.flink.runtime.io.network.NettyShuffleEnvironment; +import org.apache.flink.runtime.io.network.NettyShuffleEnvironmentBuilder; +import org.apache.flink.runtime.io.network.TestingConnectionManager; +import org.apache.flink.runtime.io.network.buffer.Buffer; +import org.apache.flink.runtime.io.network.metrics.CreditBasedInputBuffersUsageGauge; +import org.apache.flink.runtime.io.network.metrics.ExclusiveBuffersUsageGauge; +import org.apache.flink.runtime.io.network.metrics.FloatingBuffersUsageGauge; +import org.apache.flink.runtime.io.network.partition.ResultPartitionType; +import org.apache.flink.util.TestLogger; + +import org.junit.Test; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import static org.junit.Assert.assertEquals; + +/** + * Tests the metrics for input buffers usage. + */ +public class InputBuffersMetricsTest extends TestLogger { + + @Test + public void testCalculateTotalBuffersSize() throws IOException { + int numberOfRemoteChannels = 2; + int numberOfLocalChannels = 0; + + int numberOfBufferPerChannel = 2; + int numberOfBuffersPerGate = 8; + + NettyShuffleEnvironment network = new NettyShuffleEnvironmentBuilder() + .setNetworkBuffersPerChannel(numberOfBufferPerChannel) + .setFloatingNetworkBuffersPerGate(numberOfBuffersPerGate) + .build(); + + Tuple3, List> tuple1 = buildInputGate( + network, + numberOfRemoteChannels, + numberOfLocalChannels); + + SingleInputGate inputGate1 = tuple1.f0; + + SingleInputGate[] inputGates = new SingleInputGate[]{inputGate1}; + FloatingBuffersUsageGauge floatingBuffersUsageGauge = new FloatingBuffersUsageGauge(inputGates); + ExclusiveBuffersUsageGauge exclusiveBuffersUsageGauge = new ExclusiveBuffersUsageGauge(inputGates); + CreditBasedInputBuffersUsageGauge inputBufferPoolUsageGauge = new CreditBasedInputBuffersUsageGauge( + floatingBuffersUsageGauge, + exclusiveBuffersUsageGauge, + inputGates); + + try (CloseableRegistry closeableRegistry = new CloseableRegistry()) { + + closeableRegistry.registerCloseable(network::close); + closeableRegistry.registerCloseable(inputGate1::close); + + assertEquals(numberOfBuffersPerGate, floatingBuffersUsageGauge.calculateTotalBuffers(inputGate1)); + assertEquals(numberOfRemoteChannels * numberOfBufferPerChannel, exclusiveBuffersUsageGauge.calculateTotalBuffers(inputGate1)); + assertEquals(numberOfRemoteChannels * numberOfBufferPerChannel + numberOfBuffersPerGate, inputBufferPoolUsageGauge.calculateTotalBuffers(inputGate1)); + } + } + + @Test + public void testExclusiveBuffersUsage() throws IOException { + int numberOfRemoteChannelsGate1 = 2; + int numberOfLocalChannelsGate1 = 0; + int numberOfRemoteChannelsGate2 = 1; + int numberOfLocalChannelsGate2 = 1; + + int numberOfRemoteChannelsTotal = numberOfRemoteChannelsGate1 + numberOfRemoteChannelsGate2; + int numberOfInputGates = 2; + + int buffersPerChannel = 2; + int extraNetworkBuffersPerGate = 8; + + Closea
[GitHub] [flink] zhijiangW commented on a change in pull request #8455: [FLINK-12284][Network, Metrics]Fix the incorrect inputBufferUsage metric in credit-based network mode
zhijiangW commented on a change in pull request #8455: [FLINK-12284][Network,Metrics]Fix the incorrect inputBufferUsage metric in credit-based network mode URL: https://github.com/apache/flink/pull/8455#discussion_r299292261 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputBuffersMetricsTest.java ## @@ -0,0 +1,316 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition.consumer; + +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.core.fs.CloseableRegistry; +import org.apache.flink.runtime.io.network.NettyShuffleEnvironment; +import org.apache.flink.runtime.io.network.NettyShuffleEnvironmentBuilder; +import org.apache.flink.runtime.io.network.TestingConnectionManager; +import org.apache.flink.runtime.io.network.buffer.Buffer; +import org.apache.flink.runtime.io.network.metrics.CreditBasedInputBuffersUsageGauge; +import org.apache.flink.runtime.io.network.metrics.ExclusiveBuffersUsageGauge; +import org.apache.flink.runtime.io.network.metrics.FloatingBuffersUsageGauge; +import org.apache.flink.runtime.io.network.partition.ResultPartitionType; +import org.apache.flink.util.TestLogger; + +import org.junit.Test; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import static org.junit.Assert.assertEquals; + +/** + * Tests the metrics for input buffers usage. + */ +public class InputBuffersMetricsTest extends TestLogger { + + @Test + public void testCalculateTotalBuffersSize() throws IOException { + int numberOfRemoteChannels = 2; + int numberOfLocalChannels = 0; + + int numberOfBufferPerChannel = 2; + int numberOfBuffersPerGate = 8; + + NettyShuffleEnvironment network = new NettyShuffleEnvironmentBuilder() + .setNetworkBuffersPerChannel(numberOfBufferPerChannel) + .setFloatingNetworkBuffersPerGate(numberOfBuffersPerGate) + .build(); + + Tuple3, List> tuple1 = buildInputGate( + network, + numberOfRemoteChannels, + numberOfLocalChannels); + + SingleInputGate inputGate1 = tuple1.f0; + + SingleInputGate[] inputGates = new SingleInputGate[]{inputGate1}; + FloatingBuffersUsageGauge floatingBuffersUsageGauge = new FloatingBuffersUsageGauge(inputGates); + ExclusiveBuffersUsageGauge exclusiveBuffersUsageGauge = new ExclusiveBuffersUsageGauge(inputGates); + CreditBasedInputBuffersUsageGauge inputBufferPoolUsageGauge = new CreditBasedInputBuffersUsageGauge( + floatingBuffersUsageGauge, + exclusiveBuffersUsageGauge, + inputGates); + + try (CloseableRegistry closeableRegistry = new CloseableRegistry()) { + + closeableRegistry.registerCloseable(network::close); + closeableRegistry.registerCloseable(inputGate1::close); + + assertEquals(numberOfBuffersPerGate, floatingBuffersUsageGauge.calculateTotalBuffers(inputGate1)); + assertEquals(numberOfRemoteChannels * numberOfBufferPerChannel, exclusiveBuffersUsageGauge.calculateTotalBuffers(inputGate1)); + assertEquals(numberOfRemoteChannels * numberOfBufferPerChannel + numberOfBuffersPerGate, inputBufferPoolUsageGauge.calculateTotalBuffers(inputGate1)); + } + } + + @Test + public void testExclusiveBuffersUsage() throws IOException { + int numberOfRemoteChannelsGate1 = 2; + int numberOfLocalChannelsGate1 = 0; + int numberOfRemoteChannelsGate2 = 1; + int numberOfLocalChannelsGate2 = 1; + + int numberOfRemoteChannelsTotal = numberOfRemoteChannelsGate1 + numberOfRemoteChannelsGate2; + int numberOfInputGates = 2; + + int buffersPerChannel = 2; + int extraNetworkBuffersPerGate = 8; + + Closea
[GitHub] [flink] zhijiangW commented on a change in pull request #8455: [FLINK-12284][Network, Metrics]Fix the incorrect inputBufferUsage metric in credit-based network mode
zhijiangW commented on a change in pull request #8455: [FLINK-12284][Network,Metrics]Fix the incorrect inputBufferUsage metric in credit-based network mode URL: https://github.com/apache/flink/pull/8455#discussion_r299290827 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputBuffersMetricsTest.java ## @@ -0,0 +1,316 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition.consumer; + +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.core.fs.CloseableRegistry; +import org.apache.flink.runtime.io.network.NettyShuffleEnvironment; +import org.apache.flink.runtime.io.network.NettyShuffleEnvironmentBuilder; +import org.apache.flink.runtime.io.network.TestingConnectionManager; +import org.apache.flink.runtime.io.network.buffer.Buffer; +import org.apache.flink.runtime.io.network.metrics.CreditBasedInputBuffersUsageGauge; +import org.apache.flink.runtime.io.network.metrics.ExclusiveBuffersUsageGauge; +import org.apache.flink.runtime.io.network.metrics.FloatingBuffersUsageGauge; +import org.apache.flink.runtime.io.network.partition.ResultPartitionType; +import org.apache.flink.util.TestLogger; + +import org.junit.Test; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import static org.junit.Assert.assertEquals; + +/** + * Tests the metrics for input buffers usage. + */ +public class InputBuffersMetricsTest extends TestLogger { + + @Test + public void testCalculateTotalBuffersSize() throws IOException { + int numberOfRemoteChannels = 2; + int numberOfLocalChannels = 0; + + int numberOfBufferPerChannel = 2; + int numberOfBuffersPerGate = 8; + + NettyShuffleEnvironment network = new NettyShuffleEnvironmentBuilder() + .setNetworkBuffersPerChannel(numberOfBufferPerChannel) + .setFloatingNetworkBuffersPerGate(numberOfBuffersPerGate) + .build(); + + Tuple3, List> tuple1 = buildInputGate( + network, + numberOfRemoteChannels, + numberOfLocalChannels); + + SingleInputGate inputGate1 = tuple1.f0; + + SingleInputGate[] inputGates = new SingleInputGate[]{inputGate1}; + FloatingBuffersUsageGauge floatingBuffersUsageGauge = new FloatingBuffersUsageGauge(inputGates); + ExclusiveBuffersUsageGauge exclusiveBuffersUsageGauge = new ExclusiveBuffersUsageGauge(inputGates); + CreditBasedInputBuffersUsageGauge inputBufferPoolUsageGauge = new CreditBasedInputBuffersUsageGauge( + floatingBuffersUsageGauge, + exclusiveBuffersUsageGauge, + inputGates); + + try (CloseableRegistry closeableRegistry = new CloseableRegistry()) { + + closeableRegistry.registerCloseable(network::close); + closeableRegistry.registerCloseable(inputGate1::close); + + assertEquals(numberOfBuffersPerGate, floatingBuffersUsageGauge.calculateTotalBuffers(inputGate1)); + assertEquals(numberOfRemoteChannels * numberOfBufferPerChannel, exclusiveBuffersUsageGauge.calculateTotalBuffers(inputGate1)); + assertEquals(numberOfRemoteChannels * numberOfBufferPerChannel + numberOfBuffersPerGate, inputBufferPoolUsageGauge.calculateTotalBuffers(inputGate1)); + } + } + + @Test + public void testExclusiveBuffersUsage() throws IOException { + int numberOfRemoteChannelsGate1 = 2; + int numberOfLocalChannelsGate1 = 0; + int numberOfRemoteChannelsGate2 = 1; + int numberOfLocalChannelsGate2 = 1; + + int numberOfRemoteChannelsTotal = numberOfRemoteChannelsGate1 + numberOfRemoteChannelsGate2; + int numberOfInputGates = 2; + + int buffersPerChannel = 2; + int extraNetworkBuffersPerGate = 8; + + Closea
[GitHub] [flink] zhuzhurk commented on a change in pull request #8931: [FLINK-13041] Make ScheduleMode configurable via ExecutionConfig
zhuzhurk commented on a change in pull request #8931: [FLINK-13041] Make ScheduleMode configurable via ExecutionConfig URL: https://github.com/apache/flink/pull/8931#discussion_r299290712 ## File path: flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java ## @@ -90,6 +90,8 @@ /** Defines how data exchange happens - batch or pipelined */ private ExecutionMode executionMode = ExecutionMode.PIPELINED; + private ScheduleMode scheduleMode = ScheduleMode.LAZY_FROM_SOURCES; Review comment: Does this mean if users do not specify the `ScheduleMode `, even stream jobs will be executed in batch style? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhijiangW commented on a change in pull request #8455: [FLINK-12284][Network, Metrics]Fix the incorrect inputBufferUsage metric in credit-based network mode
zhijiangW commented on a change in pull request #8455: [FLINK-12284][Network,Metrics]Fix the incorrect inputBufferUsage metric in credit-based network mode URL: https://github.com/apache/flink/pull/8455#discussion_r299290642 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputBuffersMetricsTest.java ## @@ -0,0 +1,316 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition.consumer; + +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.core.fs.CloseableRegistry; +import org.apache.flink.runtime.io.network.NettyShuffleEnvironment; +import org.apache.flink.runtime.io.network.NettyShuffleEnvironmentBuilder; +import org.apache.flink.runtime.io.network.TestingConnectionManager; +import org.apache.flink.runtime.io.network.buffer.Buffer; +import org.apache.flink.runtime.io.network.metrics.CreditBasedInputBuffersUsageGauge; +import org.apache.flink.runtime.io.network.metrics.ExclusiveBuffersUsageGauge; +import org.apache.flink.runtime.io.network.metrics.FloatingBuffersUsageGauge; +import org.apache.flink.runtime.io.network.partition.ResultPartitionType; +import org.apache.flink.util.TestLogger; + +import org.junit.Test; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import static org.junit.Assert.assertEquals; + +/** + * Tests the metrics for input buffers usage. + */ +public class InputBuffersMetricsTest extends TestLogger { + + @Test + public void testCalculateTotalBuffersSize() throws IOException { + int numberOfRemoteChannels = 2; + int numberOfLocalChannels = 0; + + int numberOfBufferPerChannel = 2; + int numberOfBuffersPerGate = 8; + + NettyShuffleEnvironment network = new NettyShuffleEnvironmentBuilder() + .setNetworkBuffersPerChannel(numberOfBufferPerChannel) + .setFloatingNetworkBuffersPerGate(numberOfBuffersPerGate) + .build(); + + Tuple3, List> tuple1 = buildInputGate( + network, + numberOfRemoteChannels, + numberOfLocalChannels); + + SingleInputGate inputGate1 = tuple1.f0; + + SingleInputGate[] inputGates = new SingleInputGate[]{inputGate1}; + FloatingBuffersUsageGauge floatingBuffersUsageGauge = new FloatingBuffersUsageGauge(inputGates); + ExclusiveBuffersUsageGauge exclusiveBuffersUsageGauge = new ExclusiveBuffersUsageGauge(inputGates); + CreditBasedInputBuffersUsageGauge inputBufferPoolUsageGauge = new CreditBasedInputBuffersUsageGauge( + floatingBuffersUsageGauge, + exclusiveBuffersUsageGauge, + inputGates); + + try (CloseableRegistry closeableRegistry = new CloseableRegistry()) { + + closeableRegistry.registerCloseable(network::close); + closeableRegistry.registerCloseable(inputGate1::close); + + assertEquals(numberOfBuffersPerGate, floatingBuffersUsageGauge.calculateTotalBuffers(inputGate1)); + assertEquals(numberOfRemoteChannels * numberOfBufferPerChannel, exclusiveBuffersUsageGauge.calculateTotalBuffers(inputGate1)); + assertEquals(numberOfRemoteChannels * numberOfBufferPerChannel + numberOfBuffersPerGate, inputBufferPoolUsageGauge.calculateTotalBuffers(inputGate1)); + } + } + + @Test + public void testExclusiveBuffersUsage() throws IOException { + int numberOfRemoteChannelsGate1 = 2; + int numberOfLocalChannelsGate1 = 0; + int numberOfRemoteChannelsGate2 = 1; + int numberOfLocalChannelsGate2 = 1; + + int numberOfRemoteChannelsTotal = numberOfRemoteChannelsGate1 + numberOfRemoteChannelsGate2; + int numberOfInputGates = 2; + + int buffersPerChannel = 2; + int extraNetworkBuffersPerGate = 8; + + Closea
[GitHub] [flink] zhijiangW commented on a change in pull request #8455: [FLINK-12284][Network, Metrics]Fix the incorrect inputBufferUsage metric in credit-based network mode
zhijiangW commented on a change in pull request #8455: [FLINK-12284][Network,Metrics]Fix the incorrect inputBufferUsage metric in credit-based network mode URL: https://github.com/apache/flink/pull/8455#discussion_r299290478 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputBuffersMetricsTest.java ## @@ -0,0 +1,316 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition.consumer; + +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.core.fs.CloseableRegistry; +import org.apache.flink.runtime.io.network.NettyShuffleEnvironment; +import org.apache.flink.runtime.io.network.NettyShuffleEnvironmentBuilder; +import org.apache.flink.runtime.io.network.TestingConnectionManager; +import org.apache.flink.runtime.io.network.buffer.Buffer; +import org.apache.flink.runtime.io.network.metrics.CreditBasedInputBuffersUsageGauge; +import org.apache.flink.runtime.io.network.metrics.ExclusiveBuffersUsageGauge; +import org.apache.flink.runtime.io.network.metrics.FloatingBuffersUsageGauge; +import org.apache.flink.runtime.io.network.partition.ResultPartitionType; +import org.apache.flink.util.TestLogger; + +import org.junit.Test; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import static org.junit.Assert.assertEquals; + +/** + * Tests the metrics for input buffers usage. + */ +public class InputBuffersMetricsTest extends TestLogger { + + @Test + public void testCalculateTotalBuffersSize() throws IOException { + int numberOfRemoteChannels = 2; + int numberOfLocalChannels = 0; + + int numberOfBufferPerChannel = 2; + int numberOfBuffersPerGate = 8; + + NettyShuffleEnvironment network = new NettyShuffleEnvironmentBuilder() + .setNetworkBuffersPerChannel(numberOfBufferPerChannel) + .setFloatingNetworkBuffersPerGate(numberOfBuffersPerGate) + .build(); + + Tuple3, List> tuple1 = buildInputGate( + network, + numberOfRemoteChannels, + numberOfLocalChannels); + + SingleInputGate inputGate1 = tuple1.f0; + + SingleInputGate[] inputGates = new SingleInputGate[]{inputGate1}; + FloatingBuffersUsageGauge floatingBuffersUsageGauge = new FloatingBuffersUsageGauge(inputGates); + ExclusiveBuffersUsageGauge exclusiveBuffersUsageGauge = new ExclusiveBuffersUsageGauge(inputGates); + CreditBasedInputBuffersUsageGauge inputBufferPoolUsageGauge = new CreditBasedInputBuffersUsageGauge( + floatingBuffersUsageGauge, + exclusiveBuffersUsageGauge, + inputGates); + + try (CloseableRegistry closeableRegistry = new CloseableRegistry()) { + + closeableRegistry.registerCloseable(network::close); + closeableRegistry.registerCloseable(inputGate1::close); + + assertEquals(numberOfBuffersPerGate, floatingBuffersUsageGauge.calculateTotalBuffers(inputGate1)); + assertEquals(numberOfRemoteChannels * numberOfBufferPerChannel, exclusiveBuffersUsageGauge.calculateTotalBuffers(inputGate1)); + assertEquals(numberOfRemoteChannels * numberOfBufferPerChannel + numberOfBuffersPerGate, inputBufferPoolUsageGauge.calculateTotalBuffers(inputGate1)); + } + } + + @Test + public void testExclusiveBuffersUsage() throws IOException { + int numberOfRemoteChannelsGate1 = 2; + int numberOfLocalChannelsGate1 = 0; + int numberOfRemoteChannelsGate2 = 1; + int numberOfLocalChannelsGate2 = 1; + + int numberOfRemoteChannelsTotal = numberOfRemoteChannelsGate1 + numberOfRemoteChannelsGate2; + int numberOfInputGates = 2; + + int buffersPerChannel = 2; + int extraNetworkBuffersPerGate = 8; + + Closea
[GitHub] [flink] zhijiangW commented on a change in pull request #8455: [FLINK-12284][Network, Metrics]Fix the incorrect inputBufferUsage metric in credit-based network mode
zhijiangW commented on a change in pull request #8455: [FLINK-12284][Network,Metrics]Fix the incorrect inputBufferUsage metric in credit-based network mode URL: https://github.com/apache/flink/pull/8455#discussion_r299290395 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputBuffersMetricsTest.java ## @@ -0,0 +1,316 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition.consumer; + +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.core.fs.CloseableRegistry; +import org.apache.flink.runtime.io.network.NettyShuffleEnvironment; +import org.apache.flink.runtime.io.network.NettyShuffleEnvironmentBuilder; +import org.apache.flink.runtime.io.network.TestingConnectionManager; +import org.apache.flink.runtime.io.network.buffer.Buffer; +import org.apache.flink.runtime.io.network.metrics.CreditBasedInputBuffersUsageGauge; +import org.apache.flink.runtime.io.network.metrics.ExclusiveBuffersUsageGauge; +import org.apache.flink.runtime.io.network.metrics.FloatingBuffersUsageGauge; +import org.apache.flink.runtime.io.network.partition.ResultPartitionType; +import org.apache.flink.util.TestLogger; + +import org.junit.Test; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import static org.junit.Assert.assertEquals; + +/** + * Tests the metrics for input buffers usage. + */ +public class InputBuffersMetricsTest extends TestLogger { + + @Test + public void testCalculateTotalBuffersSize() throws IOException { + int numberOfRemoteChannels = 2; + int numberOfLocalChannels = 0; + + int numberOfBufferPerChannel = 2; + int numberOfBuffersPerGate = 8; + + NettyShuffleEnvironment network = new NettyShuffleEnvironmentBuilder() + .setNetworkBuffersPerChannel(numberOfBufferPerChannel) + .setFloatingNetworkBuffersPerGate(numberOfBuffersPerGate) + .build(); + + Tuple3, List> tuple1 = buildInputGate( + network, + numberOfRemoteChannels, + numberOfLocalChannels); + + SingleInputGate inputGate1 = tuple1.f0; + + SingleInputGate[] inputGates = new SingleInputGate[]{inputGate1}; + FloatingBuffersUsageGauge floatingBuffersUsageGauge = new FloatingBuffersUsageGauge(inputGates); + ExclusiveBuffersUsageGauge exclusiveBuffersUsageGauge = new ExclusiveBuffersUsageGauge(inputGates); + CreditBasedInputBuffersUsageGauge inputBufferPoolUsageGauge = new CreditBasedInputBuffersUsageGauge( + floatingBuffersUsageGauge, + exclusiveBuffersUsageGauge, + inputGates); + + try (CloseableRegistry closeableRegistry = new CloseableRegistry()) { + + closeableRegistry.registerCloseable(network::close); + closeableRegistry.registerCloseable(inputGate1::close); + + assertEquals(numberOfBuffersPerGate, floatingBuffersUsageGauge.calculateTotalBuffers(inputGate1)); + assertEquals(numberOfRemoteChannels * numberOfBufferPerChannel, exclusiveBuffersUsageGauge.calculateTotalBuffers(inputGate1)); + assertEquals(numberOfRemoteChannels * numberOfBufferPerChannel + numberOfBuffersPerGate, inputBufferPoolUsageGauge.calculateTotalBuffers(inputGate1)); + } + } + + @Test + public void testExclusiveBuffersUsage() throws IOException { + int numberOfRemoteChannelsGate1 = 2; + int numberOfLocalChannelsGate1 = 0; + int numberOfRemoteChannelsGate2 = 1; + int numberOfLocalChannelsGate2 = 1; + + int numberOfRemoteChannelsTotal = numberOfRemoteChannelsGate1 + numberOfRemoteChannelsGate2; + int numberOfInputGates = 2; + + int buffersPerChannel = 2; + int extraNetworkBuffersPerGate = 8; + + Closea
[GitHub] [flink] zhijiangW commented on a change in pull request #8455: [FLINK-12284][Network, Metrics]Fix the incorrect inputBufferUsage metric in credit-based network mode
zhijiangW commented on a change in pull request #8455: [FLINK-12284][Network,Metrics]Fix the incorrect inputBufferUsage metric in credit-based network mode URL: https://github.com/apache/flink/pull/8455#discussion_r299290244 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputBuffersMetricsTest.java ## @@ -0,0 +1,316 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition.consumer; + +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.core.fs.CloseableRegistry; +import org.apache.flink.runtime.io.network.NettyShuffleEnvironment; +import org.apache.flink.runtime.io.network.NettyShuffleEnvironmentBuilder; +import org.apache.flink.runtime.io.network.TestingConnectionManager; +import org.apache.flink.runtime.io.network.buffer.Buffer; +import org.apache.flink.runtime.io.network.metrics.CreditBasedInputBuffersUsageGauge; +import org.apache.flink.runtime.io.network.metrics.ExclusiveBuffersUsageGauge; +import org.apache.flink.runtime.io.network.metrics.FloatingBuffersUsageGauge; +import org.apache.flink.runtime.io.network.partition.ResultPartitionType; +import org.apache.flink.util.TestLogger; + +import org.junit.Test; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import static org.junit.Assert.assertEquals; + +/** + * Tests the metrics for input buffers usage. + */ +public class InputBuffersMetricsTest extends TestLogger { + + @Test + public void testCalculateTotalBuffersSize() throws IOException { + int numberOfRemoteChannels = 2; + int numberOfLocalChannels = 0; + + int numberOfBufferPerChannel = 2; + int numberOfBuffersPerGate = 8; + + NettyShuffleEnvironment network = new NettyShuffleEnvironmentBuilder() + .setNetworkBuffersPerChannel(numberOfBufferPerChannel) + .setFloatingNetworkBuffersPerGate(numberOfBuffersPerGate) + .build(); + + Tuple3, List> tuple1 = buildInputGate( + network, + numberOfRemoteChannels, + numberOfLocalChannels); + + SingleInputGate inputGate1 = tuple1.f0; + + SingleInputGate[] inputGates = new SingleInputGate[]{inputGate1}; + FloatingBuffersUsageGauge floatingBuffersUsageGauge = new FloatingBuffersUsageGauge(inputGates); + ExclusiveBuffersUsageGauge exclusiveBuffersUsageGauge = new ExclusiveBuffersUsageGauge(inputGates); + CreditBasedInputBuffersUsageGauge inputBufferPoolUsageGauge = new CreditBasedInputBuffersUsageGauge( + floatingBuffersUsageGauge, + exclusiveBuffersUsageGauge, + inputGates); + + try (CloseableRegistry closeableRegistry = new CloseableRegistry()) { + + closeableRegistry.registerCloseable(network::close); + closeableRegistry.registerCloseable(inputGate1::close); + + assertEquals(numberOfBuffersPerGate, floatingBuffersUsageGauge.calculateTotalBuffers(inputGate1)); + assertEquals(numberOfRemoteChannels * numberOfBufferPerChannel, exclusiveBuffersUsageGauge.calculateTotalBuffers(inputGate1)); + assertEquals(numberOfRemoteChannels * numberOfBufferPerChannel + numberOfBuffersPerGate, inputBufferPoolUsageGauge.calculateTotalBuffers(inputGate1)); + } + } + + @Test + public void testExclusiveBuffersUsage() throws IOException { + int numberOfRemoteChannelsGate1 = 2; + int numberOfLocalChannelsGate1 = 0; + int numberOfRemoteChannelsGate2 = 1; + int numberOfLocalChannelsGate2 = 1; + + int numberOfRemoteChannelsTotal = numberOfRemoteChannelsGate1 + numberOfRemoteChannelsGate2; + int numberOfInputGates = 2; + + int buffersPerChannel = 2; + int extraNetworkBuffersPerGate = 8; + + Closea
[GitHub] [flink] zhijiangW commented on a change in pull request #8455: [FLINK-12284][Network, Metrics]Fix the incorrect inputBufferUsage metric in credit-based network mode
zhijiangW commented on a change in pull request #8455: [FLINK-12284][Network,Metrics]Fix the incorrect inputBufferUsage metric in credit-based network mode URL: https://github.com/apache/flink/pull/8455#discussion_r299290244 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputBuffersMetricsTest.java ## @@ -0,0 +1,316 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition.consumer; + +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.core.fs.CloseableRegistry; +import org.apache.flink.runtime.io.network.NettyShuffleEnvironment; +import org.apache.flink.runtime.io.network.NettyShuffleEnvironmentBuilder; +import org.apache.flink.runtime.io.network.TestingConnectionManager; +import org.apache.flink.runtime.io.network.buffer.Buffer; +import org.apache.flink.runtime.io.network.metrics.CreditBasedInputBuffersUsageGauge; +import org.apache.flink.runtime.io.network.metrics.ExclusiveBuffersUsageGauge; +import org.apache.flink.runtime.io.network.metrics.FloatingBuffersUsageGauge; +import org.apache.flink.runtime.io.network.partition.ResultPartitionType; +import org.apache.flink.util.TestLogger; + +import org.junit.Test; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import static org.junit.Assert.assertEquals; + +/** + * Tests the metrics for input buffers usage. + */ +public class InputBuffersMetricsTest extends TestLogger { + + @Test + public void testCalculateTotalBuffersSize() throws IOException { + int numberOfRemoteChannels = 2; + int numberOfLocalChannels = 0; + + int numberOfBufferPerChannel = 2; + int numberOfBuffersPerGate = 8; + + NettyShuffleEnvironment network = new NettyShuffleEnvironmentBuilder() + .setNetworkBuffersPerChannel(numberOfBufferPerChannel) + .setFloatingNetworkBuffersPerGate(numberOfBuffersPerGate) + .build(); + + Tuple3, List> tuple1 = buildInputGate( + network, + numberOfRemoteChannels, + numberOfLocalChannels); + + SingleInputGate inputGate1 = tuple1.f0; + + SingleInputGate[] inputGates = new SingleInputGate[]{inputGate1}; + FloatingBuffersUsageGauge floatingBuffersUsageGauge = new FloatingBuffersUsageGauge(inputGates); + ExclusiveBuffersUsageGauge exclusiveBuffersUsageGauge = new ExclusiveBuffersUsageGauge(inputGates); + CreditBasedInputBuffersUsageGauge inputBufferPoolUsageGauge = new CreditBasedInputBuffersUsageGauge( + floatingBuffersUsageGauge, + exclusiveBuffersUsageGauge, + inputGates); + + try (CloseableRegistry closeableRegistry = new CloseableRegistry()) { + + closeableRegistry.registerCloseable(network::close); + closeableRegistry.registerCloseable(inputGate1::close); + + assertEquals(numberOfBuffersPerGate, floatingBuffersUsageGauge.calculateTotalBuffers(inputGate1)); + assertEquals(numberOfRemoteChannels * numberOfBufferPerChannel, exclusiveBuffersUsageGauge.calculateTotalBuffers(inputGate1)); + assertEquals(numberOfRemoteChannels * numberOfBufferPerChannel + numberOfBuffersPerGate, inputBufferPoolUsageGauge.calculateTotalBuffers(inputGate1)); + } + } + + @Test + public void testExclusiveBuffersUsage() throws IOException { + int numberOfRemoteChannelsGate1 = 2; + int numberOfLocalChannelsGate1 = 0; + int numberOfRemoteChannelsGate2 = 1; + int numberOfLocalChannelsGate2 = 1; + + int numberOfRemoteChannelsTotal = numberOfRemoteChannelsGate1 + numberOfRemoteChannelsGate2; + int numberOfInputGates = 2; + + int buffersPerChannel = 2; + int extraNetworkBuffersPerGate = 8; + + Closea
[GitHub] [flink] zhijiangW commented on a change in pull request #8455: [FLINK-12284][Network, Metrics]Fix the incorrect inputBufferUsage metric in credit-based network mode
zhijiangW commented on a change in pull request #8455: [FLINK-12284][Network,Metrics]Fix the incorrect inputBufferUsage metric in credit-based network mode URL: https://github.com/apache/flink/pull/8455#discussion_r299289710 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputBuffersMetricsTest.java ## @@ -0,0 +1,316 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition.consumer; + +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.core.fs.CloseableRegistry; +import org.apache.flink.runtime.io.network.NettyShuffleEnvironment; +import org.apache.flink.runtime.io.network.NettyShuffleEnvironmentBuilder; +import org.apache.flink.runtime.io.network.TestingConnectionManager; +import org.apache.flink.runtime.io.network.buffer.Buffer; +import org.apache.flink.runtime.io.network.metrics.CreditBasedInputBuffersUsageGauge; +import org.apache.flink.runtime.io.network.metrics.ExclusiveBuffersUsageGauge; +import org.apache.flink.runtime.io.network.metrics.FloatingBuffersUsageGauge; +import org.apache.flink.runtime.io.network.partition.ResultPartitionType; +import org.apache.flink.util.TestLogger; + +import org.junit.Test; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import static org.junit.Assert.assertEquals; + +/** + * Tests the metrics for input buffers usage. + */ +public class InputBuffersMetricsTest extends TestLogger { + + @Test + public void testCalculateTotalBuffersSize() throws IOException { + int numberOfRemoteChannels = 2; + int numberOfLocalChannels = 0; + + int numberOfBufferPerChannel = 2; + int numberOfBuffersPerGate = 8; + + NettyShuffleEnvironment network = new NettyShuffleEnvironmentBuilder() + .setNetworkBuffersPerChannel(numberOfBufferPerChannel) + .setFloatingNetworkBuffersPerGate(numberOfBuffersPerGate) + .build(); + + Tuple3, List> tuple1 = buildInputGate( + network, + numberOfRemoteChannels, + numberOfLocalChannels); + + SingleInputGate inputGate1 = tuple1.f0; + + SingleInputGate[] inputGates = new SingleInputGate[]{inputGate1}; + FloatingBuffersUsageGauge floatingBuffersUsageGauge = new FloatingBuffersUsageGauge(inputGates); + ExclusiveBuffersUsageGauge exclusiveBuffersUsageGauge = new ExclusiveBuffersUsageGauge(inputGates); + CreditBasedInputBuffersUsageGauge inputBufferPoolUsageGauge = new CreditBasedInputBuffersUsageGauge( + floatingBuffersUsageGauge, + exclusiveBuffersUsageGauge, + inputGates); + + try (CloseableRegistry closeableRegistry = new CloseableRegistry()) { + + closeableRegistry.registerCloseable(network::close); + closeableRegistry.registerCloseable(inputGate1::close); + + assertEquals(numberOfBuffersPerGate, floatingBuffersUsageGauge.calculateTotalBuffers(inputGate1)); + assertEquals(numberOfRemoteChannels * numberOfBufferPerChannel, exclusiveBuffersUsageGauge.calculateTotalBuffers(inputGate1)); + assertEquals(numberOfRemoteChannels * numberOfBufferPerChannel + numberOfBuffersPerGate, inputBufferPoolUsageGauge.calculateTotalBuffers(inputGate1)); + } + } + + @Test + public void testExclusiveBuffersUsage() throws IOException { + int numberOfRemoteChannelsGate1 = 2; + int numberOfLocalChannelsGate1 = 0; + int numberOfRemoteChannelsGate2 = 1; + int numberOfLocalChannelsGate2 = 1; + + int numberOfRemoteChannelsTotal = numberOfRemoteChannelsGate1 + numberOfRemoteChannelsGate2; + int numberOfInputGates = 2; + + int buffersPerChannel = 2; + int extraNetworkBuffersPerGate = 8; + + Closea
[GitHub] [flink] zhijiangW commented on a change in pull request #8455: [FLINK-12284][Network, Metrics]Fix the incorrect inputBufferUsage metric in credit-based network mode
zhijiangW commented on a change in pull request #8455: [FLINK-12284][Network,Metrics]Fix the incorrect inputBufferUsage metric in credit-based network mode URL: https://github.com/apache/flink/pull/8455#discussion_r299289630 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputBuffersMetricsTest.java ## @@ -0,0 +1,316 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition.consumer; + +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.core.fs.CloseableRegistry; +import org.apache.flink.runtime.io.network.NettyShuffleEnvironment; +import org.apache.flink.runtime.io.network.NettyShuffleEnvironmentBuilder; +import org.apache.flink.runtime.io.network.TestingConnectionManager; +import org.apache.flink.runtime.io.network.buffer.Buffer; +import org.apache.flink.runtime.io.network.metrics.CreditBasedInputBuffersUsageGauge; +import org.apache.flink.runtime.io.network.metrics.ExclusiveBuffersUsageGauge; +import org.apache.flink.runtime.io.network.metrics.FloatingBuffersUsageGauge; +import org.apache.flink.runtime.io.network.partition.ResultPartitionType; +import org.apache.flink.util.TestLogger; + +import org.junit.Test; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import static org.junit.Assert.assertEquals; + +/** + * Tests the metrics for input buffers usage. + */ +public class InputBuffersMetricsTest extends TestLogger { + + @Test + public void testCalculateTotalBuffersSize() throws IOException { + int numberOfRemoteChannels = 2; + int numberOfLocalChannels = 0; + + int numberOfBufferPerChannel = 2; + int numberOfBuffersPerGate = 8; + + NettyShuffleEnvironment network = new NettyShuffleEnvironmentBuilder() + .setNetworkBuffersPerChannel(numberOfBufferPerChannel) + .setFloatingNetworkBuffersPerGate(numberOfBuffersPerGate) + .build(); + + Tuple3, List> tuple1 = buildInputGate( + network, + numberOfRemoteChannels, + numberOfLocalChannels); + + SingleInputGate inputGate1 = tuple1.f0; + + SingleInputGate[] inputGates = new SingleInputGate[]{inputGate1}; + FloatingBuffersUsageGauge floatingBuffersUsageGauge = new FloatingBuffersUsageGauge(inputGates); + ExclusiveBuffersUsageGauge exclusiveBuffersUsageGauge = new ExclusiveBuffersUsageGauge(inputGates); + CreditBasedInputBuffersUsageGauge inputBufferPoolUsageGauge = new CreditBasedInputBuffersUsageGauge( + floatingBuffersUsageGauge, + exclusiveBuffersUsageGauge, + inputGates); + + try (CloseableRegistry closeableRegistry = new CloseableRegistry()) { + + closeableRegistry.registerCloseable(network::close); + closeableRegistry.registerCloseable(inputGate1::close); + + assertEquals(numberOfBuffersPerGate, floatingBuffersUsageGauge.calculateTotalBuffers(inputGate1)); + assertEquals(numberOfRemoteChannels * numberOfBufferPerChannel, exclusiveBuffersUsageGauge.calculateTotalBuffers(inputGate1)); + assertEquals(numberOfRemoteChannels * numberOfBufferPerChannel + numberOfBuffersPerGate, inputBufferPoolUsageGauge.calculateTotalBuffers(inputGate1)); + } + } + + @Test + public void testExclusiveBuffersUsage() throws IOException { + int numberOfRemoteChannelsGate1 = 2; + int numberOfLocalChannelsGate1 = 0; + int numberOfRemoteChannelsGate2 = 1; + int numberOfLocalChannelsGate2 = 1; + + int numberOfRemoteChannelsTotal = numberOfRemoteChannelsGate1 + numberOfRemoteChannelsGate2; + int numberOfInputGates = 2; + + int buffersPerChannel = 2; + int extraNetworkBuffersPerGate = 8; + + Closea
[jira] [Commented] (FLINK-13011) Release the PyFlink into PyPI
[ https://issues.apache.org/jira/browse/FLINK-13011?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16876629#comment-16876629 ] sunjincheng commented on FLINK-13011: - Thanks [~dian.fu] :) That's great! > Release the PyFlink into PyPI > - > > Key: FLINK-13011 > URL: https://issues.apache.org/jira/browse/FLINK-13011 > Project: Flink > Issue Type: Sub-task > Components: API / Python, Build System >Affects Versions: 1.9.0 >Reporter: sunjincheng >Priority: Major > > FLINK-12962 adds the ability to build a PyFlink distribution package, but we > have not yet released PyFlink to PyPI. The goal of JIRA is to publish the > PyFlinjk distribution package built by FLINK-12962 to PyPI. > https://pypi.org/ > https://packaging.python.org/tutorials/packaging-projects/ > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] zhijiangW commented on a change in pull request #8455: [FLINK-12284][Network, Metrics]Fix the incorrect inputBufferUsage metric in credit-based network mode
zhijiangW commented on a change in pull request #8455: [FLINK-12284][Network,Metrics]Fix the incorrect inputBufferUsage metric in credit-based network mode URL: https://github.com/apache/flink/pull/8455#discussion_r299288165 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputBuffersMetricsTest.java ## @@ -0,0 +1,316 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition.consumer; + +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.core.fs.CloseableRegistry; +import org.apache.flink.runtime.io.network.NettyShuffleEnvironment; +import org.apache.flink.runtime.io.network.NettyShuffleEnvironmentBuilder; +import org.apache.flink.runtime.io.network.TestingConnectionManager; +import org.apache.flink.runtime.io.network.buffer.Buffer; +import org.apache.flink.runtime.io.network.metrics.CreditBasedInputBuffersUsageGauge; +import org.apache.flink.runtime.io.network.metrics.ExclusiveBuffersUsageGauge; +import org.apache.flink.runtime.io.network.metrics.FloatingBuffersUsageGauge; +import org.apache.flink.runtime.io.network.partition.ResultPartitionType; +import org.apache.flink.util.TestLogger; + +import org.junit.Test; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import static org.junit.Assert.assertEquals; + +/** + * Tests the metrics for input buffers usage. + */ +public class InputBuffersMetricsTest extends TestLogger { + + @Test + public void testCalculateTotalBuffersSize() throws IOException { + int numberOfRemoteChannels = 2; + int numberOfLocalChannels = 0; + + int numberOfBufferPerChannel = 2; + int numberOfBuffersPerGate = 8; + + NettyShuffleEnvironment network = new NettyShuffleEnvironmentBuilder() + .setNetworkBuffersPerChannel(numberOfBufferPerChannel) + .setFloatingNetworkBuffersPerGate(numberOfBuffersPerGate) + .build(); + + Tuple3, List> tuple1 = buildInputGate( + network, + numberOfRemoteChannels, + numberOfLocalChannels); + + SingleInputGate inputGate1 = tuple1.f0; Review comment: We only need `SingleInputGate` from tuple here, so it is simple `SinglerInputGate inputGate = buildInputGate(network, numberOfRemoteChannels, numberOfLocalChannels).f0` directly. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhijiangW commented on a change in pull request #8455: [FLINK-12284][Network, Metrics]Fix the incorrect inputBufferUsage metric in credit-based network mode
zhijiangW commented on a change in pull request #8455: [FLINK-12284][Network,Metrics]Fix the incorrect inputBufferUsage metric in credit-based network mode URL: https://github.com/apache/flink/pull/8455#discussion_r299287676 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputBuffersMetricsTest.java ## @@ -0,0 +1,301 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition.consumer; + +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.core.fs.CloseableRegistry; +import org.apache.flink.runtime.io.network.NettyShuffleEnvironment; +import org.apache.flink.runtime.io.network.NettyShuffleEnvironmentBuilder; +import org.apache.flink.runtime.io.network.TestingConnectionManager; +import org.apache.flink.runtime.io.network.buffer.Buffer; +import org.apache.flink.runtime.io.network.metrics.CreditBasedInputBuffersUsageGauge; +import org.apache.flink.runtime.io.network.metrics.ExclusiveBuffersUsageGauge; +import org.apache.flink.runtime.io.network.metrics.FloatingBuffersUsageGauge; +import org.apache.flink.runtime.io.network.partition.ResultPartitionID; +import org.apache.flink.runtime.io.network.partition.ResultPartitionType; +import org.apache.flink.util.TestLogger; + +import org.junit.Test; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import static org.junit.Assert.assertEquals; + +/** + * Tests the metrics for input buffers usage. + */ +public class InputBuffersMetricsTest extends TestLogger { + + @Test + public void testCalculateTotalBuffersSize() throws IOException { + int numberOfRemoteChannels = 2; + int numberOfLocalChannels = 0; + + int buffersPerChannel = 2; + int extraNetworkBuffersPerGate = 8; + + CloseableRegistry closeableRegistry = new CloseableRegistry(); + + NettyShuffleEnvironment network = new NettyShuffleEnvironmentBuilder() + .setNetworkBuffersPerChannel(buffersPerChannel) + .setFloatingNetworkBuffersPerGate(extraNetworkBuffersPerGate) + .build(); + + Tuple3, List> tuple1 = buildInputGate( + network, + numberOfRemoteChannels, + numberOfLocalChannels); + + SingleInputGate inputGate1 = tuple1.f0; + + closeableRegistry.registerCloseable(network::close); + closeableRegistry.registerCloseable(inputGate1::close); + + try { + SingleInputGate[] inputGates = new SingleInputGate[]{inputGate1}; + FloatingBuffersUsageGauge floatingBuffersUsageGauge = new FloatingBuffersUsageGauge(inputGates); + ExclusiveBuffersUsageGauge exclusiveBuffersUsageGauge = new ExclusiveBuffersUsageGauge(inputGates); + CreditBasedInputBuffersUsageGauge inputBufferPoolUsageGauge = new CreditBasedInputBuffersUsageGauge( + floatingBuffersUsageGauge, + exclusiveBuffersUsageGauge, + inputGates); + + assertEquals(extraNetworkBuffersPerGate, floatingBuffersUsageGauge.calculateTotalBuffers(inputGate1)); + assertEquals(numberOfRemoteChannels * buffersPerChannel, exclusiveBuffersUsageGauge.calculateTotalBuffers(inputGate1)); + assertEquals(numberOfRemoteChannels * buffersPerChannel + extraNetworkBuffersPerGate, inputBufferPoolUsageGauge.calculateTotalBuffers(inputGate1)); + } finally { + closeableRegistry.close(); + } + } + + @Test + public void testExclusiveBuffersUsage() throws IOException { + int numberOfRemoteChannelsGate1 = 2; + int numberOfLocalChannelsGate1 = 0; + int numberOfRemoteChannelsGate2 = 1; + int numberOfLocalChannelsGate2 = 1; + + int numberOfRemoteChannelsTotal = numberOfRemoteChannelsGate1 +
[GitHub] [flink] zhijiangW commented on a change in pull request #8455: [FLINK-12284][Network, Metrics]Fix the incorrect inputBufferUsage metric in credit-based network mode
zhijiangW commented on a change in pull request #8455: [FLINK-12284][Network,Metrics]Fix the incorrect inputBufferUsage metric in credit-based network mode URL: https://github.com/apache/flink/pull/8455#discussion_r299287578 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputBuffersMetricsTest.java ## @@ -0,0 +1,316 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition.consumer; + +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.core.fs.CloseableRegistry; +import org.apache.flink.runtime.io.network.NettyShuffleEnvironment; +import org.apache.flink.runtime.io.network.NettyShuffleEnvironmentBuilder; +import org.apache.flink.runtime.io.network.TestingConnectionManager; +import org.apache.flink.runtime.io.network.buffer.Buffer; +import org.apache.flink.runtime.io.network.metrics.CreditBasedInputBuffersUsageGauge; +import org.apache.flink.runtime.io.network.metrics.ExclusiveBuffersUsageGauge; +import org.apache.flink.runtime.io.network.metrics.FloatingBuffersUsageGauge; +import org.apache.flink.runtime.io.network.partition.ResultPartitionType; +import org.apache.flink.util.TestLogger; + +import org.junit.Test; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import static org.junit.Assert.assertEquals; + +/** + * Tests the metrics for input buffers usage. + */ +public class InputBuffersMetricsTest extends TestLogger { + + @Test + public void testCalculateTotalBuffersSize() throws IOException { + int numberOfRemoteChannels = 2; + int numberOfLocalChannels = 0; + + int numberOfBufferPerChannel = 2; + int numberOfBuffersPerGate = 8; + + NettyShuffleEnvironment network = new NettyShuffleEnvironmentBuilder() + .setNetworkBuffersPerChannel(numberOfBufferPerChannel) + .setFloatingNetworkBuffersPerGate(numberOfBuffersPerGate) + .build(); + + Tuple3, List> tuple1 = buildInputGate( + network, + numberOfRemoteChannels, + numberOfLocalChannels); + + SingleInputGate inputGate1 = tuple1.f0; + + SingleInputGate[] inputGates = new SingleInputGate[]{inputGate1}; + FloatingBuffersUsageGauge floatingBuffersUsageGauge = new FloatingBuffersUsageGauge(inputGates); + ExclusiveBuffersUsageGauge exclusiveBuffersUsageGauge = new ExclusiveBuffersUsageGauge(inputGates); + CreditBasedInputBuffersUsageGauge inputBufferPoolUsageGauge = new CreditBasedInputBuffersUsageGauge( + floatingBuffersUsageGauge, + exclusiveBuffersUsageGauge, + inputGates); + + try (CloseableRegistry closeableRegistry = new CloseableRegistry()) { + + closeableRegistry.registerCloseable(network::close); + closeableRegistry.registerCloseable(inputGate1::close); + + assertEquals(numberOfBuffersPerGate, floatingBuffersUsageGauge.calculateTotalBuffers(inputGate1)); + assertEquals(numberOfRemoteChannels * numberOfBufferPerChannel, exclusiveBuffersUsageGauge.calculateTotalBuffers(inputGate1)); + assertEquals(numberOfRemoteChannels * numberOfBufferPerChannel + numberOfBuffersPerGate, inputBufferPoolUsageGauge.calculateTotalBuffers(inputGate1)); + } + } + + @Test + public void testExclusiveBuffersUsage() throws IOException { + int numberOfRemoteChannelsGate1 = 2; + int numberOfLocalChannelsGate1 = 0; + int numberOfRemoteChannelsGate2 = 1; + int numberOfLocalChannelsGate2 = 1; + + int numberOfRemoteChannelsTotal = numberOfRemoteChannelsGate1 + numberOfRemoteChannelsGate2; + int numberOfInputGates = 2; + + int buffersPerChannel = 2; + int extraNetworkBuffersPerGate = 8; + + Closea
[GitHub] [flink] c4emmmm commented on issue #8526: [FLINK-12597][ml] Remove the legacy flink-libraries/flink-ml
c4e commented on issue #8526: [FLINK-12597][ml] Remove the legacy flink-libraries/flink-ml URL: https://github.com/apache/flink/pull/8526#issuecomment-507495317 > @c4e Agreed, fell free to pull in my commit from #8827 or I can reopen the PR. As part of FLIP-42 we are restructuring the docs and decided to fully remove the old FlinkML documentation. If users are still using this library from an old version, they can also use an old version of the docs. I would be opposed to adding an "under construction page", I find that more confusing. Thanks. I also oppose that with deeper thought so I mark it with delete line. Since the modification is almost done in #8827, I think it's better to reopen the PR and merge it. The only docs that mentioning FlinkML are docs/internals/components.md and components.zh.md. You can find the keyword "FlinkML" and decide whether to remove them. @sjwiesman , would you please spend a little time doing this if @shaoxuan-wang and @zentol agree we merge this and #8827 separately? And what do you think? @shaoxuan-wang @zentol This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-13050) Counting more checkpoint failure reason in CheckpointFailureManager
[ https://issues.apache.org/jira/browse/FLINK-13050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16876617#comment-16876617 ] vinoyang commented on FLINK-13050: -- cc [~pnowojski] [~azagrebin] WDYT? > Counting more checkpoint failure reason in CheckpointFailureManager > --- > > Key: FLINK-13050 > URL: https://issues.apache.org/jira/browse/FLINK-13050 > Project: Flink > Issue Type: Improvement > Components: Runtime / Checkpointing >Reporter: vinoyang >Assignee: vinoyang >Priority: Major > > Currently, {{CheckpointFailureManager}} only counted little failure reasons > to keep compatible with {{setFailOnCheckpointingErrors}}. While > {{setFailOnCheckpointingErrors}} has been deprecated in FLINK-11662. IMO, we > can count more checkpoint failure reasons. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-13050) Counting more checkpoint failure reason in CheckpointFailureManager
vinoyang created FLINK-13050: Summary: Counting more checkpoint failure reason in CheckpointFailureManager Key: FLINK-13050 URL: https://issues.apache.org/jira/browse/FLINK-13050 Project: Flink Issue Type: Improvement Components: Runtime / Checkpointing Reporter: vinoyang Assignee: vinoyang Currently, {{CheckpointFailureManager}} only counted little failure reasons to keep compatible with {{setFailOnCheckpointingErrors}}. While {{setFailOnCheckpointingErrors}} has been deprecated in FLINK-11662. IMO, we can count more checkpoint failure reasons. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] liyafan82 closed pull request #8934: [FLINK-12628][Runtime / Coordination] Check test failure if partitionhas no consumers in Execution.getPartitionMaxParallelism
liyafan82 closed pull request #8934: [FLINK-12628][Runtime / Coordination] Check test failure if partitionhas no consumers in Execution.getPartitionMaxParallelism URL: https://github.com/apache/flink/pull/8934 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] liyafan82 opened a new pull request #8934: [FLINK-12628][Runtime / Coordination] Check test failure if partitionhas no consumers in Execution.getPartitionMaxParallelism
liyafan82 opened a new pull request #8934: [FLINK-12628][Runtime / Coordination] Check test failure if partitionhas no consumers in Execution.getPartitionMaxParallelism URL: https://github.com/apache/flink/pull/8934 ## What is the purpose of the change Resolve issue FLINK-12628 Check test failure if partition has no consumers in Execution.getPartitionMaxParallelism: Currently, we work around this case in Execution.getPartitionMaxParallelism because of tests: // TODO consumers.isEmpty() only exists for test, currently there has to be exactly one consumer in real jobs! though partition is supposed to have always at least one consumer atm. We should check which test fails and consider fixing it. According to my investigation, there is no test failure, when we ignore the case for consumers.isEmpty() equals to true. ## Brief change log - Change the implementation of Execution.getPartitionMaxParallelism to ignore the case for consumers.isEmpty() equals to true. ## Verifying this change This change is already covered by existing tests, such as ExecutionTest. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] liyafan82 commented on a change in pull request #8934: [FLINK-12628][Runtime / Coordination] Check test failure if partitionhas no consumers in Execution.getPartitionMaxParallelism
liyafan82 commented on a change in pull request #8934: [FLINK-12628][Runtime / Coordination] Check test failure if partitionhas no consumers in Execution.getPartitionMaxParallelism URL: https://github.com/apache/flink/pull/8934#discussion_r299282016 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java ## @@ -679,14 +678,10 @@ public void setInitialState(@Nullable JobManagerTaskRestore taskRestore) { } private static int getPartitionMaxParallelism(IntermediateResultPartition partition) { - // TODO consumers.isEmpty() only exists for test, currently there has to be exactly one consumer in real jobs! final List> consumers = partition.getConsumers(); - int maxParallelism = KeyGroupRangeAssignment.UPPER_BOUND_MAX_PARALLELISM; - if (!consumers.isEmpty()) { - List consumer = consumers.get(0); - ExecutionJobVertex consumerVertex = consumer.get(0).getTarget().getJobVertex(); - maxParallelism = consumerVertex.getMaxParallelism(); - } + List consumer = consumers.get(0); Review comment: Maybe not. In my local environment, no exception was thrown when we ignore the case for the consumer being empty. Let's see if we can reproduce it in Travis. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] liyafan82 commented on a change in pull request #8936: [FLINK-13043][Library / CEP] Fix the bug of parsing Dewey number from string
liyafan82 commented on a change in pull request #8936: [FLINK-13043][Library / CEP] Fix the bug of parsing Dewey number from string URL: https://github.com/apache/flink/pull/8936#discussion_r299281233 ## File path: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/DeweyNumber.java ## @@ -175,16 +175,18 @@ public String toString() { public static DeweyNumber fromString(final String deweyNumberString) { String[] splits = deweyNumberString.split("\\."); - if (splits.length == 0) { + if (splits.length == 1) { return new DeweyNumber(Integer.parseInt(deweyNumberString)); - } else { + } else if (splits.length > 0) { Review comment: @Myasuka thanks for the comments. In some rare cases, the length can be 0. So this condition guards against such cases. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] bowenli86 commented on a change in pull request #8940: [FLINK-13047][table] Fix the Optional.orElse() usage issue in Databas…
bowenli86 commented on a change in pull request #8940: [FLINK-13047][table] Fix the Optional.orElse() usage issue in Databas… URL: https://github.com/apache/flink/pull/8940#discussion_r299279990 ## File path: flink-table/flink-table-planner/src/main/java/org/apache/flink/table/catalog/DatabaseCalciteSchema.java ## @@ -104,9 +103,19 @@ private Table convertConnectorTable(ConnectorCatalogTable table) { } private Table convertCatalogTable(ObjectPath tablePath, CatalogTable table) { + TableSource tableSource; Optional tableFactory = catalog.getTableFactory(); - TableSource tableSource = tableFactory.map(tf -> ((TableSourceFactory) tf).createTableSource(tablePath, table)) - .orElse(TableFactoryUtil.findAndCreateTableSource(table)); + if (tableFactory.isPresent()) { + TableFactory tf = tableFactory.get(); + if (tf instanceof TableSourceFactory) { + tableSource = ((TableSourceFactory) tf).createTableSource(tablePath, table); + } else { + throw new TableException(String.format("TableFactory provided by catalog %s must implement TableSourceFactory", Review comment: minor: shall we use similar error message as that in `convertConnectorTable()`, something like "Cannot query a sink-only table. TableFactory provided by catalog %s must implement ". Asking this because when I first saw this exception, I got a bit on confused why it cannot be `TableSinkFactory`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-13037) Translate "Concepts -> Glossary" page into Chinese
[ https://issues.apache.org/jira/browse/FLINK-13037?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16876607#comment-16876607 ] Jark Wu commented on FLINK-13037: - Hi [~knaufk], could you link the prerequisite issue which will create this page/markdown? > Translate "Concepts -> Glossary" page into Chinese > -- > > Key: FLINK-13037 > URL: https://issues.apache.org/jira/browse/FLINK-13037 > Project: Flink > Issue Type: Sub-task > Components: chinese-translation, Documentation >Reporter: Konstantin Knauf >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] ifndef-SleePy closed pull request #8894: [FLINK-12961][datastream] Providing an internal execution method of StreamExecutionEnvironment accepting StreamGraph as input parameter
ifndef-SleePy closed pull request #8894: [FLINK-12961][datastream] Providing an internal execution method of StreamExecutionEnvironment accepting StreamGraph as input parameter URL: https://github.com/apache/flink/pull/8894 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] ifndef-SleePy commented on issue #8894: [FLINK-12961][datastream] Providing an internal execution method of StreamExecutionEnvironment accepting StreamGraph as input parameter
ifndef-SleePy commented on issue #8894: [FLINK-12961][datastream] Providing an internal execution method of StreamExecutionEnvironment accepting StreamGraph as input parameter URL: https://github.com/apache/flink/pull/8894#issuecomment-507490234 Abandon this PR since we think alternative approach is better. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-13037) Translate "Concepts -> Glossary" page into Chinese
[ https://issues.apache.org/jira/browse/FLINK-13037?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jark Wu updated FLINK-13037: Component/s: Documentation chinese-translation > Translate "Concepts -> Glossary" page into Chinese > -- > > Key: FLINK-13037 > URL: https://issues.apache.org/jira/browse/FLINK-13037 > Project: Flink > Issue Type: Sub-task > Components: chinese-translation, Documentation >Reporter: Konstantin Knauf >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-13049) Port planner expressions to blink-planner from flink-planner
Jingsong Lee created FLINK-13049: Summary: Port planner expressions to blink-planner from flink-planner Key: FLINK-13049 URL: https://issues.apache.org/jira/browse/FLINK-13049 Project: Flink Issue Type: Improvement Components: Table SQL / Planner Reporter: Jingsong Lee Assignee: Jingsong Lee -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] xuefuz commented on issue #8926: [FLINK-13021][table][hive] unify catalog partition implementations
xuefuz commented on issue #8926: [FLINK-13021][table][hive] unify catalog partition implementations URL: https://github.com/apache/flink/pull/8926#issuecomment-507481283 Okay. thanks for the explanation. I guess both don't matter much right now. Let's review on comments when we do DDL. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] xuefuz commented on a change in pull request #8940: [FLINK-13047][table] Fix the Optional.orElse() usage issue in Databas…
xuefuz commented on a change in pull request #8940: [FLINK-13047][table] Fix the Optional.orElse() usage issue in Databas… URL: https://github.com/apache/flink/pull/8940#discussion_r299262699 ## File path: flink-table/flink-table-planner/src/main/java/org/apache/flink/table/catalog/DatabaseCalciteSchema.java ## @@ -105,8 +105,10 @@ private Table convertConnectorTable(ConnectorCatalogTable table) { private Table convertCatalogTable(ObjectPath tablePath, CatalogTable table) { Optional tableFactory = catalog.getTableFactory(); - TableSource tableSource = tableFactory.map(tf -> ((TableSourceFactory) tf).createTableSource(tablePath, table)) - .orElse(TableFactoryUtil.findAndCreateTableSource(table)); + TableSource tableSource = tableFactory.map(tf -> ((TableSourceFactory) tf).createTableSource(tablePath, table)).get(); Review comment: Good point. However, that's not what I meant to do. HiveTableFactory is responsible to generate table source/sink for both generic and hive tables, so it needs to take care of both. In DatabaseCalciteSchema.convertCatalogTable(), it calls TableFactoryUtil.findAndCreateTableSource(table) only if the catalog doesn't return a table factory object. Thus, there is no duplication per se. However, I did realize that the logic here is confusion, especially with the java Optional object. I will refactor the code a bit to make it clearer. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] asfgit closed pull request #8669: [FLINK-11147][table][docs] Add documentation for TableAggregate Function
asfgit closed pull request #8669: [FLINK-11147][table][docs] Add documentation for TableAggregate Function URL: https://github.com/apache/flink/pull/8669 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Closed] (FLINK-11147) Add documentation for TableAggregate Function
[ https://issues.apache.org/jira/browse/FLINK-11147?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] sunjincheng closed FLINK-11147. --- Resolution: Fixed Fix Version/s: 1.9.0 Fixed in master : 6a12908b15c398e37f8603cd84e0d30e14d07784 > Add documentation for TableAggregate Function > - > > Key: FLINK-11147 > URL: https://issues.apache.org/jira/browse/FLINK-11147 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / API >Reporter: Hequn Cheng >Assignee: Hequn Cheng >Priority: Major > Labels: pull-request-available > Fix For: 1.9.0 > > Time Spent: 10m > Remaining Estimate: 0h > > Add documentation for {{TableAggregateFunction}}, similar to the document of > {{AggregateFunction}}: > https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/table/udfs.html#aggregation-functions > Most parts of {{TableAggregateFunction}} would be same with > {{AggregateFunction}}, except for the ways of handling outputs. > {{AggregateFunction}} outputs a scalar value, while > {{TableAggregateFunction}} outputs a Table with multi rows and columns. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] bowenli86 commented on issue #8926: [FLINK-13021][table][hive] unify catalog partition implementations
bowenli86 commented on issue #8926: [FLINK-13021][table][hive] unify catalog partition implementations URL: https://github.com/apache/flink/pull/8926#issuecomment-507466762 Good questions. > 1. I saw a few configuration files for Hive (table/db, etc) and each has just few constants. I'm wondering if we should just have one file that covers all those configs. Exactly. I've observed that too. I plan to merge them into a single `HiveCatalogConfig` in a following up. > 2. When we create Catalog objects (table/db/partition), we remove "comment" from the property map. I'm wondering how that changes desc command output for those objects because ideally comment should be shown, at least for extended mode. We only do that for table and partition. - Hive's `Database` has a `description` field to store comment and we use that, thus that's the same output for 'DESC' - Hive DDL for tables (e.g. `CREATE TABLE xx COMMENT xxx`) store comment as just a 'comment' property, so that's the same output for 'DESC' - AFAIK, Hive DDL doesn't support partition comment, thus it shouldn't matter how we store it in properties This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot edited a comment on issue #8669: [FLINK-11147][table][docs] Add documentation for TableAggregate Function
flinkbot edited a comment on issue #8669: [FLINK-11147][table][docs] Add documentation for TableAggregate Function URL: https://github.com/apache/flink/pull/8669#issuecomment-500274671 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Review Progress * ✅ 1. The [description] looks good. - Approved by @sunjincheng121 [committer] * ✅ 2. There is [consensus] that the contribution should go into to Flink. - Approved by @sunjincheng121 [committer] * ❓ 3. Needs [attention] from. * ✅ 4. The change fits into the overall [architecture]. - Approved by @sunjincheng121 [committer] * ✅ 5. Overall code [quality] is good. - Approved by @sunjincheng121 [committer] Please see the [Pull Request Review Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`) - `@flinkbot approve all` to approve all aspects - `@flinkbot approve-until architecture` to approve everything until `architecture` - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval you gave earlier This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] bowenli86 commented on a change in pull request #8940: [FLINK-13047][table] Fix the Optional.orElse() usage issue in Databas…
bowenli86 commented on a change in pull request #8940: [FLINK-13047][table] Fix the Optional.orElse() usage issue in Databas… URL: https://github.com/apache/flink/pull/8940#discussion_r299256624 ## File path: flink-table/flink-table-planner/src/main/java/org/apache/flink/table/catalog/DatabaseCalciteSchema.java ## @@ -105,8 +105,10 @@ private Table convertConnectorTable(ConnectorCatalogTable table) { private Table convertCatalogTable(ObjectPath tablePath, CatalogTable table) { Optional tableFactory = catalog.getTableFactory(); - TableSource tableSource = tableFactory.map(tf -> ((TableSourceFactory) tf).createTableSource(tablePath, table)) - .orElse(TableFactoryUtil.findAndCreateTableSource(table)); + TableSource tableSource = tableFactory.map(tf -> ((TableSourceFactory) tf).createTableSource(tablePath, table)).get(); Review comment: by comparing the logic here and `HiveTableFactory.createTableSource()`, I found there are duplicated calls on `TableFactoryUtil.findAndCreateTableSource(table)`. That made me wonder if any table factory implementation shouldn't call the generic table discovery service (a.k.a `TableFactoryUtil.findAndCreateTableSource`), and should just create tables that is specific to that table factory itself. Thus, the `HiveTableFactory.createTableSource` should just be as the following: ``` @Override public TableSource createTableSource(ObjectPath tablePath, CatalogTable table) { ... if (!isGeneric) { return createInputFormatTableSource(tablePath, table); } else { // return TableFactoryUtil.findAndCreateTableSource(table); return null; } } ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] bowenli86 commented on a change in pull request #8940: [FLINK-13047][table] Fix the Optional.orElse() usage issue in Databas…
bowenli86 commented on a change in pull request #8940: [FLINK-13047][table] Fix the Optional.orElse() usage issue in Databas… URL: https://github.com/apache/flink/pull/8940#discussion_r299256624 ## File path: flink-table/flink-table-planner/src/main/java/org/apache/flink/table/catalog/DatabaseCalciteSchema.java ## @@ -105,8 +105,10 @@ private Table convertConnectorTable(ConnectorCatalogTable table) { private Table convertCatalogTable(ObjectPath tablePath, CatalogTable table) { Optional tableFactory = catalog.getTableFactory(); - TableSource tableSource = tableFactory.map(tf -> ((TableSourceFactory) tf).createTableSource(tablePath, table)) - .orElse(TableFactoryUtil.findAndCreateTableSource(table)); + TableSource tableSource = tableFactory.map(tf -> ((TableSourceFactory) tf).createTableSource(tablePath, table)).get(); Review comment: by comparing the logic here and `HiveTableFactory.createTableSource()`, I found there are duplicated calls on `TableFactoryUtil.findAndCreateTableSource(table)`. That made me wonder if any table factory implementation shouldn't call the generic table discovery service (a.k.a `TableFactoryUtil.findAndCreateTableSource`), and should just create tables that is specific to that table factory itself. Thus, the `HiveTableFactory.createTableSource` should just be as the following: ``` @Override public TableSource createTableSource(ObjectPath tablePath, CatalogTable table) { ... if (!isGeneric) { return createInputFormatTableSource(tablePath, table); } else { return null; } } ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot commented on issue #8941: [FLINK-13048][hive] support decimal in Flink's integration with Hive …
flinkbot commented on issue #8941: [FLINK-13048][hive] support decimal in Flink's integration with Hive … URL: https://github.com/apache/flink/pull/8941#issuecomment-507462215 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Review Progress * ❓ 1. The [description] looks good. * ❓ 2. There is [consensus] that the contribution should go into to Flink. * ❓ 3. Needs [attention] from. * ❓ 4. The change fits into the overall [architecture]. * ❓ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`) - `@flinkbot approve all` to approve all aspects - `@flinkbot approve-until architecture` to approve everything until `architecture` - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval you gave earlier This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-13048) support decimal in Flink's integration with Hive user defined functions
[ https://issues.apache.org/jira/browse/FLINK-13048?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-13048: --- Labels: pull-request-available (was: ) > support decimal in Flink's integration with Hive user defined functions > --- > > Key: FLINK-13048 > URL: https://issues.apache.org/jira/browse/FLINK-13048 > Project: Flink > Issue Type: Sub-task > Components: Connectors / Hive >Reporter: Bowen Li >Assignee: Bowen Li >Priority: Major > Labels: pull-request-available > Fix For: 1.9.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] bowenli86 opened a new pull request #8941: [FLINK-13048][hive] support decimal in Flink's integration with Hive …
bowenli86 opened a new pull request #8941: [FLINK-13048][hive] support decimal in Flink's integration with Hive … URL: https://github.com/apache/flink/pull/8941 …user defined functions ## What is the purpose of the change This PR adds support for decimal in Flink's integration with Hive user defined functions. ## Brief change log - added decimal conversions in util methods of `HiveInspectors` - added unit tests ## Verifying this change This change added tests and can be verified as follows: See newly added unit tests in `HiveSimpleUDFTest`, `HiveGenericUDFTest` and `HiveGenericUDAFTest`. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (yes) - If yes, how is the feature documented? (not applicable) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] bowenli86 commented on issue #8941: [FLINK-13048][hive] support decimal in Flink's integration with Hive …
bowenli86 commented on issue #8941: [FLINK-13048][hive] support decimal in Flink's integration with Hive … URL: https://github.com/apache/flink/pull/8941#issuecomment-507461988 cc @xuefuz @JingsongLi @lirui-apache @zjuwangg This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Created] (FLINK-13048) support decimal in Flink's integration with Hive user defined functions
Bowen Li created FLINK-13048: Summary: support decimal in Flink's integration with Hive user defined functions Key: FLINK-13048 URL: https://issues.apache.org/jira/browse/FLINK-13048 Project: Flink Issue Type: Sub-task Components: Connectors / Hive Reporter: Bowen Li Assignee: Bowen Li Fix For: 1.9.0 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] xuefuz commented on a change in pull request #8926: [FLINK-13021][table][hive] unify catalog partition implementations
xuefuz commented on a change in pull request #8926: [FLINK-13021][table][hive] unify catalog partition implementations URL: https://github.com/apache/flink/pull/8926#discussion_r299243763 ## File path: flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java ## @@ -623,8 +622,10 @@ public void createPartition(ObjectPath tablePath, CatalogPartitionSpec partition checkNotNull(partitionSpec, "CatalogPartitionSpec cannot be null"); checkNotNull(partition, "Partition cannot be null"); - if (!(partition instanceof HiveCatalogPartition)) { - throw new CatalogException("Currently only supports HiveCatalogPartition"); + boolean isGeneric = Boolean.valueOf(partition.getProperties().get(CatalogConfig.IS_GENERIC)); + + if (isGeneric) { + throw new CatalogException("Currently only supports non-generic CatalogPartition"); Review comment: nit: The msg doesn't seem to be a complete sentence. Maybe something like "only xxx is supported currently". Same as below. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot commented on issue #8940: [FLINK-13047][table] Fix the Optional.orElse() usage issue in Databas…
flinkbot commented on issue #8940: [FLINK-13047][table] Fix the Optional.orElse() usage issue in Databas… URL: https://github.com/apache/flink/pull/8940#issuecomment-507448909 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Review Progress * ❓ 1. The [description] looks good. * ❓ 2. There is [consensus] that the contribution should go into to Flink. * ❓ 3. Needs [attention] from. * ❓ 4. The change fits into the overall [architecture]. * ❓ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`) - `@flinkbot approve all` to approve all aspects - `@flinkbot approve-until architecture` to approve everything until `architecture` - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval you gave earlier This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-13047) Fix the Optional.orElse() usage issue in DatabaseCalciteSchema
[ https://issues.apache.org/jira/browse/FLINK-13047?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-13047: --- Labels: pull-request-available (was: ) > Fix the Optional.orElse() usage issue in DatabaseCalciteSchema > --- > > Key: FLINK-13047 > URL: https://issues.apache.org/jira/browse/FLINK-13047 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / API >Affects Versions: 1.9.0 >Reporter: Xuefu Zhang >Assignee: Xuefu Zhang >Priority: Major > Labels: pull-request-available > > It's found that Optional.orElse() will evaluate the argument first before > returning Optional.get(). If the evaluation throws an exception then the call > fails even if the Optional object is nonempty. This the case In > {{DatabaseCalciteSchema.convertCatalogTable()}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] xuefuz commented on issue #8940: [FLINK-13047][table] Fix the Optional.orElse() usage issue in Databas…
xuefuz commented on issue #8940: [FLINK-13047][table] Fix the Optional.orElse() usage issue in Databas… URL: https://github.com/apache/flink/pull/8940#issuecomment-507448645 cc: @bowenli86 @lirui-apache @zjuwangg This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] xuefuz opened a new pull request #8940: [FLINK-13047][table] Fix the Optional.orElse() usage issue in Databas…
xuefuz opened a new pull request #8940: [FLINK-13047][table] Fix the Optional.orElse() usage issue in Databas… URL: https://github.com/apache/flink/pull/8940 …eCalciteSchema ## What is the purpose of the change [FLINK-13047][table] Fix the Optional.orElse() usage issue in DatabaseCalciteSchem ## Brief change log - Evaluate the expression is orElse() clause only if Optional object is empty. ## Verifying this change This change is already covered by existing test. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): ( no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: ( no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Created] (FLINK-13047) Fix the Optional.orElse() usage issue in DatabaseCalciteSchema
Xuefu Zhang created FLINK-13047: --- Summary: Fix the Optional.orElse() usage issue in DatabaseCalciteSchema Key: FLINK-13047 URL: https://issues.apache.org/jira/browse/FLINK-13047 Project: Flink Issue Type: Sub-task Components: Table SQL / API Affects Versions: 1.9.0 Reporter: Xuefu Zhang Assignee: Xuefu Zhang It's found that Optional.orElse() will evaluate the argument first before returning Optional.get(). If the evaluation throws an exception then the call fails even if the Optional object is nonempty. This the case In {{DatabaseCalciteSchema.convertCatalogTable()}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (FLINK-5478) Redis Sink Connector should allow update of command without reinstatiation
[ https://issues.apache.org/jira/browse/FLINK-5478?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16876472#comment-16876472 ] Ton van Bart edited comment on FLINK-5478 at 7/1/19 8:28 PM: - I think calling {{getCommandDescription()}} on every call to {{invoke()}} would be wrong, as this would mean that also the Redis command to use could change on every call, instead of only the additional key. My suggestion would be to enhance the RedisMapper with an extra method {{getAdditionalKey(T data)}} which returns {{Optional}} and give the interface a {{default}} implementation which returns {{Optional.empty()}}. This way the interface remains backwards compatible with existing implementations of it. In {{RedisSink}} the only change would be to change {{this.additionalKey}} with {{optAdditionalKey.orElse(this.additionalKey)}} for the applicable methods; this also will give backwards compatibility with any existing code that uses this class. I tested this change on my fork and all the tests still pass. was (Author: tonvanbart): I think calling {{getCommandDescription()}} on every call to {{invoke()}} would be wrong, as this would mean that also the Redis command to use could change on every call. My suggestion would be to enhance the RedisMapper with an extra method {{getAdditionalKey(T data)}} which returns {{Optional}} and give the interface a {{default}} implementation which returns {{Optional.empty()}}. This way the interface remains backwards compatible with existing implementations of it. In {{RedisSink}} the only change would be to change {{this.additionalKey}} with {{optAdditionalKey.orElse(this.additionalKey)}} for the applicable methods; this also will give backwards compatibility with any existing code that uses this class. I tested this change on my fork and all the tests still pass. > Redis Sink Connector should allow update of command without reinstatiation > -- > > Key: FLINK-5478 > URL: https://issues.apache.org/jira/browse/FLINK-5478 > Project: Flink > Issue Type: Improvement > Components: Connectors / Common >Affects Versions: 1.1.4 >Reporter: Atharva Inamdar >Priority: Major > > `getCommandDescription()` gets called when RedisSink is instantiated. This > happens only once and thus doesn't allow the command to be updated during run > time. > Use Case: > As a dev I want to store some data by day. So each key will have some date > specified. this will change over course of time. for example: > `counts_for_148426560` for 2017-01-13. This is not limited to any > particular command. > connector: > https://github.com/apache/bahir-flink/blob/master/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisSink.java#L114 > I wish `getCommandDescription()` could be called in `invoke()` so that the > key can be updated without having to restart. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (FLINK-5478) Redis Sink Connector should allow update of command without reinstatiation
[ https://issues.apache.org/jira/browse/FLINK-5478?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16876472#comment-16876472 ] Ton van Bart edited comment on FLINK-5478 at 7/1/19 8:27 PM: - I think calling {{getCommandDescription()}} on every call to {{invoke()}} would be wrong, as this would mean that also the Redis command to use could change on every call. My suggestion would be to enhance the RedisMapper with an extra method {{getAdditionalKey(T data)}} which returns {{Optional}} and give the interface a {{default}} implementation which returns {{Optional.empty()}}. This way the interface remains backwards compatible with existing implementations of it. In {{RedisSink}} the only change would be to change {{this.additionalKey}} with {{optAdditionalKey.orElse(this.additionalKey)}} for the applicable methods; this also will give backwards compatibility with any existing code that uses this class. I tested this change on my fork and all the tests still pass. was (Author: tonvanbart): I think calling {{getCommandDescription()}} on every call to {{invoke()}} would be wrong, as this would mean that also the Redis command to use could change on every call. My suggestion would be to enhance the RedisMapper with an extra method {{getAdditionalKey(T data)}} which returns {{Optional}} and give the interface a {{default}} implementation which returns {{Optional.empty()}}. This way the interface remains backwards compatible with existing implementations of it. In {{RedisSink}} the only change would be to change {{this.additionalKey}} with {{optAdditionalKey.orElse(this.additionalKey)}} for the applicable methods; this also will give backwards compatibility with any existing code that uses this class. > Redis Sink Connector should allow update of command without reinstatiation > -- > > Key: FLINK-5478 > URL: https://issues.apache.org/jira/browse/FLINK-5478 > Project: Flink > Issue Type: Improvement > Components: Connectors / Common >Affects Versions: 1.1.4 >Reporter: Atharva Inamdar >Priority: Major > > `getCommandDescription()` gets called when RedisSink is instantiated. This > happens only once and thus doesn't allow the command to be updated during run > time. > Use Case: > As a dev I want to store some data by day. So each key will have some date > specified. this will change over course of time. for example: > `counts_for_148426560` for 2017-01-13. This is not limited to any > particular command. > connector: > https://github.com/apache/bahir-flink/blob/master/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisSink.java#L114 > I wish `getCommandDescription()` could be called in `invoke()` so that the > key can be updated without having to restart. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-5478) Redis Sink Connector should allow update of command without reinstatiation
[ https://issues.apache.org/jira/browse/FLINK-5478?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16876472#comment-16876472 ] Ton van Bart commented on FLINK-5478: - I think calling {{getCommandDescription()}} on every call to {{invoke()}} would be wrong, as this would mean that also the Redis command to use could change on every call. My suggestion would be to enhance the RedisMapper with an extra method {{getAdditionalKey(T data)}} which returns {{Optional}} and give the interface a {{default}} implementation which returns {{Optional.empty()}}. This way the interface remains backwards compatible with existing implementations of it. In {{RedisSink}} the only change would be to change {{this.additionalKey}} with {{optAdditionalKey.orElse(this.additionalKey)}} for the applicable methods; this also will give backwards compatibility with any existing code that uses this class. > Redis Sink Connector should allow update of command without reinstatiation > -- > > Key: FLINK-5478 > URL: https://issues.apache.org/jira/browse/FLINK-5478 > Project: Flink > Issue Type: Improvement > Components: Connectors / Common >Affects Versions: 1.1.4 >Reporter: Atharva Inamdar >Priority: Major > > `getCommandDescription()` gets called when RedisSink is instantiated. This > happens only once and thus doesn't allow the command to be updated during run > time. > Use Case: > As a dev I want to store some data by day. So each key will have some date > specified. this will change over course of time. for example: > `counts_for_148426560` for 2017-01-13. This is not limited to any > particular command. > connector: > https://github.com/apache/bahir-flink/blob/master/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisSink.java#L114 > I wish `getCommandDescription()` could be called in `invoke()` so that the > key can be updated without having to restart. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] flinkbot commented on issue #8939: [FLINK-13046][hive] rename hive-site-path to hive-conf-dir to be cons…
flinkbot commented on issue #8939: [FLINK-13046][hive] rename hive-site-path to hive-conf-dir to be cons… URL: https://github.com/apache/flink/pull/8939#issuecomment-507400400 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Review Progress * ❓ 1. The [description] looks good. * ❓ 2. There is [consensus] that the contribution should go into to Flink. * ❓ 3. Needs [attention] from. * ❓ 4. The change fits into the overall [architecture]. * ❓ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`) - `@flinkbot approve all` to approve all aspects - `@flinkbot approve-until architecture` to approve everything until `architecture` - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval you gave earlier This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-13046) rename hive-site-path to hive-conf-dir to be consistent with standard name in Hive
[ https://issues.apache.org/jira/browse/FLINK-13046?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-13046: --- Labels: pull-request-available (was: ) > rename hive-site-path to hive-conf-dir to be consistent with standard name in > Hive > -- > > Key: FLINK-13046 > URL: https://issues.apache.org/jira/browse/FLINK-13046 > Project: Flink > Issue Type: Sub-task > Components: Connectors / Hive >Reporter: Bowen Li >Assignee: Bowen Li >Priority: Major > Labels: pull-request-available > Fix For: 1.9.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] bowenli86 commented on issue #8939: [FLINK-13046][hive] rename hive-site-path to hive-conf-dir to be cons…
bowenli86 commented on issue #8939: [FLINK-13046][hive] rename hive-site-path to hive-conf-dir to be cons… URL: https://github.com/apache/flink/pull/8939#issuecomment-507400080 cc @xuefuz @lirui-apache @zjuwangg This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] bowenli86 opened a new pull request #8939: [FLINK-13046][hive] rename hive-site-path to hive-conf-dir to be cons…
bowenli86 opened a new pull request #8939: [FLINK-13046][hive] rename hive-site-path to hive-conf-dir to be cons… URL: https://github.com/apache/flink/pull/8939 …istent with standard name in Hive ## What is the purpose of the change This PR renames the SQL CLI config key for HiveCatalog from `hive-site-path` to `hive-conf-dir` which is consistent with standard Hive conf key name. ## Brief change log - renamed the config from `hive-site-path` to `hive-conf-dir` - updated a few method names and yaml files of unit tests - updated documentation ## Verifying this change This change is already covered by existing tests, such as *ExecutionContextTest*. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (docs) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] bowenli86 commented on issue #8926: [FLINK-13021][table][hive] unify catalog partition implementations
bowenli86 commented on issue #8926: [FLINK-13021][table][hive] unify catalog partition implementations URL: https://github.com/apache/flink/pull/8926#issuecomment-507395104 cc @xuefuz @lirui-apache @zjuwangg This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Created] (FLINK-13046) rename hive-site-path to hive-conf-dir to be consistent with standard name in Hive
Bowen Li created FLINK-13046: Summary: rename hive-site-path to hive-conf-dir to be consistent with standard name in Hive Key: FLINK-13046 URL: https://issues.apache.org/jira/browse/FLINK-13046 Project: Flink Issue Type: Sub-task Components: Connectors / Hive Reporter: Bowen Li Assignee: Bowen Li Fix For: 1.9.0 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] flinkbot edited a comment on issue #8920: [FLINK-13024][table] integrate FunctionCatalog with CatalogManager
flinkbot edited a comment on issue #8920: [FLINK-13024][table] integrate FunctionCatalog with CatalogManager URL: https://github.com/apache/flink/pull/8920#issuecomment-506515812 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Review Progress * ❓ 1. The [description] looks good. * ❓ 2. There is [consensus] that the contribution should go into to Flink. * ❗ 3. Needs [attention] from. - Needs attention by @dawidwys [committer], @twalthr [PMC] * ❓ 4. The change fits into the overall [architecture]. * ❓ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`) - `@flinkbot approve all` to approve all aspects - `@flinkbot approve-until architecture` to approve everything until `architecture` - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval you gave earlier This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] bowenli86 commented on issue #8920: [FLINK-13024][table] integrate FunctionCatalog with CatalogManager
bowenli86 commented on issue #8920: [FLINK-13024][table] integrate FunctionCatalog with CatalogManager URL: https://github.com/apache/flink/pull/8920#issuecomment-507386500 @flinkbot attention @twalthr @dawidwys This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot edited a comment on issue #8911: [FLINK-12995][hive] Add Hive-1.2.1 build to Travis
flinkbot edited a comment on issue #8911: [FLINK-12995][hive] Add Hive-1.2.1 build to Travis URL: https://github.com/apache/flink/pull/8911#issuecomment-506262493 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Review Progress * ❓ 1. The [description] looks good. * ❓ 2. There is [consensus] that the contribution should go into to Flink. * ❗ 3. Needs [attention] from. - Needs attention by @zentol [PMC] * ❓ 4. The change fits into the overall [architecture]. * ❓ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`) - `@flinkbot approve all` to approve all aspects - `@flinkbot approve-until architecture` to approve everything until `architecture` - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval you gave earlier This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services