[GitHub] [flink] XuPingyong commented on a change in pull request #8937: [FLINK-13040] promote blink table config and add to document

2019-07-01 Thread GitBox
XuPingyong commented on a change in pull request #8937: [FLINK-13040] promote 
blink table config and add to document
URL: https://github.com/apache/flink/pull/8937#discussion_r299327232
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/api/PlannerConfigOptions.java
 ##
 @@ -30,7 +31,8 @@
// 

//  Optimizer Options
// 

-
+   @Documentation.ExcludeFromDocumentation
 
 Review comment:
   General users are not easy to understand and have no need to use these 
configs in most scenes.  Advanced users can tune performance of some special 
cases according to them.
   So I do not want to place these professional configs to the documentation.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] dawidwys commented on a change in pull request #8929: [FLINK-13028][table] Move expression resolver to flink-table-api-java

2019-07-01 Thread GitBox
dawidwys commented on a change in pull request #8929: [FLINK-13028][table] Move 
expression resolver to flink-table-api-java
URL: https://github.com/apache/flink/pull/8929#discussion_r299327183
 
 

 ##
 File path: 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/ResolveCallByArgumentsRule.java
 ##
 @@ -38,75 +40,103 @@
 import org.apache.flink.table.types.inference.TypeStrategies;
 import org.apache.flink.util.Preconditions;
 
-import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 import java.util.Optional;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
+import static java.util.Collections.singletonList;
+import static 
org.apache.flink.table.expressions.utils.ApiExpressionUtils.valueLiteral;
+import static 
org.apache.flink.table.types.utils.TypeConversions.fromDataTypeToLegacyInfo;
+import static 
org.apache.flink.table.types.utils.TypeConversions.fromLegacyInfoToDataType;
+
 /**
  * This rule checks if a {@link UnresolvedCallExpression} can work with the 
given arguments and infers
  * the output data type. All function calls are resolved {@link 
CallExpression} after applying this
- * rule except for the special case of {@link 
BuiltInFunctionDefinitions#FLATTEN}.
+ * rule.
+ *
+ * This rule also resolves {@code flatten()} calls on composite types.
  *
  * If the call expects different types of arguments, but the given 
arguments have types that can
  * be casted, a {@link BuiltInFunctionDefinitions#CAST} expression is inserted.
- *
- * @see ResolveFlattenCallRule
  */
 @Internal
 final class ResolveCallByArgumentsRule implements ResolverRule {
 
@Override
public List apply(List expression, 
ResolutionContext context) {
return expression.stream()
-   .map(expr -> expr.accept(new 
CallArgumentsCastingVisitor(context)))
+   .flatMap(expr -> expr.accept(new 
ResolvingCallVisitor(context)).stream())
.collect(Collectors.toList());
}
 
-   private class CallArgumentsCastingVisitor extends 
RuleExpressionVisitor {
+   // 

+
+   private class ResolvingCallVisitor extends 
RuleExpressionVisitor> {
 
-   CallArgumentsCastingVisitor(ResolutionContext context) {
+   ResolvingCallVisitor(ResolutionContext context) {
super(context);
}
 
@Override
-   public Expression visit(UnresolvedCallExpression 
unresolvedCall) {
+   public List visit(UnresolvedCallExpression 
unresolvedCall) {
 
final List resolvedArgs = 
unresolvedCall.getChildren().stream()
-   .map(c -> c.accept(this))
-   .map(e -> {
-   // special case: FLATTEN
-   // a call chain 
`myFunc().flatten().flatten()` is not allowed
-   if (e instanceof 
UnresolvedCallExpression &&
-   
((UnresolvedCallExpression) e).getFunctionDefinition() == 
BuiltInFunctionDefinitions.FLATTEN) {
-   throw new 
ValidationException("Consecutive flattening calls are not allowed.");
-   }
-   if (e instanceof ResolvedExpression) {
-   return (ResolvedExpression) e;
-   }
-   throw new TableException("Unexpected 
unresolved expression: " + e);
-   })
+   .flatMap(c -> c.accept(this).stream())
.collect(Collectors.toList());
 
-   // FLATTEN is a special case and the only call that 
remains unresolved after this rule
-   // it will be resolved by ResolveFlattenCallRule
if (unresolvedCall.getFunctionDefinition() == 
BuiltInFunctionDefinitions.FLATTEN) {
-   return unresolvedCall.replaceArgs(new 
ArrayList<>(resolvedArgs));
+   return executeFlatten(resolvedArgs);
}
 
if (unresolvedCall.getFunctionDefinition() instanceof 
BuiltInFunctionDefinition) {
final BuiltInFunctionDefinition definition =
(BuiltInFunctionDefinition) 
unresolvedCall.getFunctionDefinition();
 
if 
(definition.getTypeInference().getOutputTypeStrategy() != 
TypeStrategies.MISSING) {
-   return runTypeInference(
-

[GitHub] [flink] becketqin commented on issue #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE)

2019-07-01 Thread GitBox
becketqin commented on issue #6594: [FLINK-9311] [pubsub] Added PubSub source 
connector with support for checkpointing (ATLEAST_ONCE)
URL: https://github.com/apache/flink/pull/6594#issuecomment-507544552
 
 
   @Xeli Just to let you know, I am reviewing the patch. Hopefully we can check 
this in before 1.9 release (code freeze on Jul 5). So we will be able to get 
more feedback as users onboard.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-13051) Drop the non-selectable two-input StreamTask and Processor

2019-07-01 Thread Haibo Sun (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-13051?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Haibo Sun updated FLINK-13051:
--
Description: After `StreamTwoInputSelectableProcessor` supports 
`CheckpointBarrierHandler`, we should drop the non-selectable two-input 
StreamTask and Processor.  (was: After `StreamTwoInputSelectableProcessor` 
supports `CheckpointBarrierHandler`, we should  drop the non-selectable  
two-input StreamTask and Processor.)

> Drop the non-selectable two-input StreamTask and Processor
> --
>
> Key: FLINK-13051
> URL: https://issues.apache.org/jira/browse/FLINK-13051
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Task
>Reporter: Haibo Sun
>Assignee: Haibo Sun
>Priority: Major
>
> After `StreamTwoInputSelectableProcessor` supports 
> `CheckpointBarrierHandler`, we should drop the non-selectable two-input 
> StreamTask and Processor.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] XuPingyong commented on a change in pull request #8937: [FLINK-13040] promote blink table config and add to document

2019-07-01 Thread GitBox
XuPingyong commented on a change in pull request #8937: [FLINK-13040] promote 
blink table config and add to document
URL: https://github.com/apache/flink/pull/8937#discussion_r299323887
 
 

 ##
 File path: 
flink-docs/src/main/java/org/apache/flink/docs/configuration/ConfigOptionsDocGenerator.java
 ##
 @@ -260,10 +262,26 @@ private static String toHtmlTable(final 
List options) {
private static String toHtmlString(final OptionWithMetaInfo 
optionWithMetaInfo) {
ConfigOption option = optionWithMetaInfo.option;
String defaultValue = stringifyDefault(optionWithMetaInfo);
+   Documentation.TableOption tableOption = 
optionWithMetaInfo.field.getAnnotation(Documentation.TableOption.class);
+   StringBuilder sb = new StringBuilder();
+   if (tableOption != null) {
+   Documentation.ExecMode execMode = 
tableOption.execMode();
+   if (Documentation.ExecMode.BATCH.equals(execMode) || 
Documentation.ExecMode.STREAMING.equals(execMode)) {
+   sb.append(" ")
+   .append(execMode.toString())
+   .append("");
+   } else if 
(Documentation.ExecMode.BOTH.equals(execMode)) {
 
 Review comment:
   Fixed it.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] XuPingyong commented on a change in pull request #8937: [FLINK-13040] promote blink table config and add to document

2019-07-01 Thread GitBox
XuPingyong commented on a change in pull request #8937: [FLINK-13040] promote 
blink table config and add to document
URL: https://github.com/apache/flink/pull/8937#discussion_r299323832
 
 

 ##
 File path: 
flink-annotations/src/main/java/org/apache/flink/annotation/docs/Documentation.java
 ##
 @@ -59,6 +59,28 @@
int position() default Integer.MAX_VALUE;
}
 
+   /**
+* Annotation used on config option fields to include them in the 
"Table Options" section.
 
 Review comment:
   Thanks, fixed it.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] XuPingyong commented on a change in pull request #8937: [FLINK-13040] promote blink table config and add to document

2019-07-01 Thread GitBox
XuPingyong commented on a change in pull request #8937: [FLINK-13040] promote 
blink table config and add to document
URL: https://github.com/apache/flink/pull/8937#discussion_r299323740
 
 

 ##
 File path: docs/ops/config.md
 ##
 @@ -183,6 +183,12 @@ unless user define a `OptionsFactory` and set via 
`RocksDBStateBackend.setOption
 
 {% include generated/rocks_db_configurable_configuration.html %}
 
+### blink table planner
 
 Review comment:
   - Documents about blink will be introduced later, then config will also be 
placed together with the related API.
   - Fixed it.
   - I moved these to the end of this page.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] twalthr commented on a change in pull request #8929: [FLINK-13028][table] Move expression resolver to flink-table-api-java

2019-07-01 Thread GitBox
twalthr commented on a change in pull request #8929: [FLINK-13028][table] Move 
expression resolver to flink-table-api-java
URL: https://github.com/apache/flink/pull/8929#discussion_r299323172
 
 

 ##
 File path: 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/ResolveCallByArgumentsRule.java
 ##
 @@ -38,75 +40,103 @@
 import org.apache.flink.table.types.inference.TypeStrategies;
 import org.apache.flink.util.Preconditions;
 
-import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 import java.util.Optional;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
+import static java.util.Collections.singletonList;
+import static 
org.apache.flink.table.expressions.utils.ApiExpressionUtils.valueLiteral;
+import static 
org.apache.flink.table.types.utils.TypeConversions.fromDataTypeToLegacyInfo;
+import static 
org.apache.flink.table.types.utils.TypeConversions.fromLegacyInfoToDataType;
+
 /**
  * This rule checks if a {@link UnresolvedCallExpression} can work with the 
given arguments and infers
  * the output data type. All function calls are resolved {@link 
CallExpression} after applying this
- * rule except for the special case of {@link 
BuiltInFunctionDefinitions#FLATTEN}.
+ * rule.
+ *
+ * This rule also resolves {@code flatten()} calls on composite types.
  *
  * If the call expects different types of arguments, but the given 
arguments have types that can
  * be casted, a {@link BuiltInFunctionDefinitions#CAST} expression is inserted.
- *
- * @see ResolveFlattenCallRule
  */
 @Internal
 final class ResolveCallByArgumentsRule implements ResolverRule {
 
@Override
public List apply(List expression, 
ResolutionContext context) {
return expression.stream()
-   .map(expr -> expr.accept(new 
CallArgumentsCastingVisitor(context)))
+   .flatMap(expr -> expr.accept(new 
ResolvingCallVisitor(context)).stream())
.collect(Collectors.toList());
}
 
-   private class CallArgumentsCastingVisitor extends 
RuleExpressionVisitor {
+   // 

+
+   private class ResolvingCallVisitor extends 
RuleExpressionVisitor> {
 
-   CallArgumentsCastingVisitor(ResolutionContext context) {
+   ResolvingCallVisitor(ResolutionContext context) {
super(context);
}
 
@Override
-   public Expression visit(UnresolvedCallExpression 
unresolvedCall) {
+   public List visit(UnresolvedCallExpression 
unresolvedCall) {
 
final List resolvedArgs = 
unresolvedCall.getChildren().stream()
-   .map(c -> c.accept(this))
-   .map(e -> {
-   // special case: FLATTEN
-   // a call chain 
`myFunc().flatten().flatten()` is not allowed
-   if (e instanceof 
UnresolvedCallExpression &&
-   
((UnresolvedCallExpression) e).getFunctionDefinition() == 
BuiltInFunctionDefinitions.FLATTEN) {
-   throw new 
ValidationException("Consecutive flattening calls are not allowed.");
-   }
-   if (e instanceof ResolvedExpression) {
-   return (ResolvedExpression) e;
-   }
-   throw new TableException("Unexpected 
unresolved expression: " + e);
-   })
+   .flatMap(c -> c.accept(this).stream())
.collect(Collectors.toList());
 
-   // FLATTEN is a special case and the only call that 
remains unresolved after this rule
-   // it will be resolved by ResolveFlattenCallRule
if (unresolvedCall.getFunctionDefinition() == 
BuiltInFunctionDefinitions.FLATTEN) {
-   return unresolvedCall.replaceArgs(new 
ArrayList<>(resolvedArgs));
+   return executeFlatten(resolvedArgs);
}
 
if (unresolvedCall.getFunctionDefinition() instanceof 
BuiltInFunctionDefinition) {
final BuiltInFunctionDefinition definition =
(BuiltInFunctionDefinition) 
unresolvedCall.getFunctionDefinition();
 
if 
(definition.getTypeInference().getOutputTypeStrategy() != 
TypeStrategies.MISSING) {
-   return runTypeInference(
- 

[GitHub] [flink] XuPingyong commented on a change in pull request #8937: [FLINK-13040] promote blink table config and add to document

2019-07-01 Thread GitBox
XuPingyong commented on a change in pull request #8937: [FLINK-13040] promote 
blink table config and add to document
URL: https://github.com/apache/flink/pull/8937#discussion_r299322319
 
 

 ##
 File path: 
flink-annotations/src/main/java/org/apache/flink/annotation/docs/Documentation.java
 ##
 @@ -59,6 +59,28 @@
int position() default Integer.MAX_VALUE;
}
 
+   /**
+* Annotation used on config option fields to include them in the 
"Table Options" section.
+*
+* The {@link TableOption#execMode()} argument indicate which exec 
mode the config works for,
+* for batch, streaming or both.
+*
+*/
+   @Target(ElementType.FIELD)
+   @Retention(RetentionPolicy.RUNTIME)
+   @Internal
+   public @interface TableOption {
+
 
 Review comment:
   Fixed it.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] XuPingyong commented on a change in pull request #8937: [FLINK-13040] promote blink table config and add to document

2019-07-01 Thread GitBox
XuPingyong commented on a change in pull request #8937: [FLINK-13040] promote 
blink table config and add to document
URL: https://github.com/apache/flink/pull/8937#discussion_r299322273
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/api/PlannerConfigOptions.java
 ##
 @@ -39,102 +41,134 @@
"(only count RexCall 
node, including leaves and interior nodes). Negative number to" +
" use the default 
threshold: double of number of nodes.");
 
+   @Documentation.TableOption(execMode = Documentation.ExecMode.BOTH)
 
 Review comment:
   Thanks, I have remove the default.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] XuPingyong commented on issue #8937: [FLINK-13040] promote blink table config and add to document

2019-07-01 Thread GitBox
XuPingyong commented on issue #8937: [FLINK-13040] promote blink table config 
and add to document
URL: https://github.com/apache/flink/pull/8937#issuecomment-507541037
 
 
   Thanks @zentol , I have fixed according to your comments.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Created] (FLINK-13051) Drop the non-selectable two-input StreamTask and Processor

2019-07-01 Thread Haibo Sun (JIRA)
Haibo Sun created FLINK-13051:
-

 Summary: Drop the non-selectable two-input StreamTask and Processor
 Key: FLINK-13051
 URL: https://issues.apache.org/jira/browse/FLINK-13051
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Task
Reporter: Haibo Sun
Assignee: Haibo Sun


After `StreamTwoInputSelectableProcessor` supports `CheckpointBarrierHandler`, 
we should  drop the non-selectable  two-input StreamTask and Processor.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] flinkbot commented on issue #8942: [FLINK-13049][table-planner-blink] Port planner expressions to blink-planner from flink-planner

2019-07-01 Thread GitBox
flinkbot commented on issue #8942: [FLINK-13049][table-planner-blink] Port 
planner expressions to blink-planner from flink-planner
URL: https://github.com/apache/flink/pull/8942#issuecomment-507539575
 
 
   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of 
the review process.
The Bot is tracking the review progress through labels. Labels are applied 
according to the order of the review items. For consensus, approval by a Flink 
committer of PMC member is required Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve one or more aspects (aspects: 
`description`, `consensus`, `architecture` and `quality`)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot approve-until architecture` to approve everything until 
`architecture`
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval you gave earlier
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-13049) Port planner expressions to blink-planner from flink-planner

2019-07-01 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-13049?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-13049:
---
Labels: pull-request-available  (was: )

> Port planner expressions to blink-planner from flink-planner
> 
>
> Key: FLINK-13049
> URL: https://issues.apache.org/jira/browse/FLINK-13049
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Planner
>Reporter: Jingsong Lee
>Assignee: Jingsong Lee
>Priority: Major
>  Labels: pull-request-available
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] JingsongLi opened a new pull request #8942: [FLINK-13049][table-planner-blink] Port planner expressions to blink-planner from flink-planner

2019-07-01 Thread GitBox
JingsongLi opened a new pull request #8942: [FLINK-13049][table-planner-blink] 
Port planner expressions to blink-planner from flink-planner
URL: https://github.com/apache/flink/pull/8942
 
 
   
   ## What is the purpose of the change
   
   Port planner expressions to blink-planner from flink-planner to support 
table api in blink-runner.
   We need use the planner expressions to type infer and validation.
   
   ## Brief change log
   
   1.Rename windowProperties and PlannerResolvedFieldReference to avoid name 
conflit
   2.Port planner expressions to blink-planner from flink-planner (And change 
type informations to DataType and LogicalType)
   3.Enable test in KeywordParseTest and RexNodeExtractorTest
   
   ## Verifying this change
   
   ut
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): no
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
 - The serializers: no
 - The runtime per-record code paths (performance sensitive): no
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: no
 - The S3 file system connector: no
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (no)


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] c4emmmm commented on issue #8872: [FLINK-12977][table]Port CsvTableSource to api-java-bridge

2019-07-01 Thread GitBox
c4e commented on issue #8872: [FLINK-12977][table]Port CsvTableSource to 
api-java-bridge
URL: https://github.com/apache/flink/pull/8872#issuecomment-507537975
 
 
   > Thanks @c4e .
   > 
   > I think we should also add a `CsvTableSourceITCase` in blink-planner to 
verify the lookup and scan can work well.
   > 
   > You can refer to 
`org.apache.flink.table.sources.csv.CsvTableSourceITCase#testTemporalJoinCsv`.
   
   @wuchong Thanks for your feedback. I have push a commit with most points 
fixed. Test case would be a follow-up commit.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-11637) Translate "Checkpoints" page into Chinese

2019-07-01 Thread yelun (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11637?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16876701#comment-16876701
 ] 

yelun commented on FLINK-11637:
---

[~WangHW] sorry,I will push recently

> Translate "Checkpoints" page into Chinese
> -
>
> Key: FLINK-11637
> URL: https://issues.apache.org/jira/browse/FLINK-11637
> Project: Flink
>  Issue Type: Sub-task
>  Components: chinese-translation, Documentation
>Reporter: Congxian Qiu(klion26)
>Assignee: yelun
>Priority: Major
>
> doc locates in flink/docs/ops/state/checkpoints.md



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] liyafan82 commented on a change in pull request #8854: [FLINK-12959][table-planner-blink] Use BoundedInput and InputSelectable in blink and implement hash join

2019-07-01 Thread GitBox
liyafan82 commented on a change in pull request #8854: 
[FLINK-12959][table-planner-blink] Use BoundedInput and InputSelectable in 
blink and implement hash join
URL: https://github.com/apache/flink/pull/8854#discussion_r299301202
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecExchange.scala
 ##
 @@ -163,37 +163,45 @@ class BatchExecExchange(
 val inputType = input.getOutputType.asInstanceOf[BaseRowTypeInfo]
 val outputRowType = 
BaseRowTypeInfo.of(FlinkTypeFactory.toLogicalRowType(getRowType))
 
-// TODO supports DataExchangeMode.BATCH in runtime
-if (requiredExchangeMode.contains(DataExchangeMode.BATCH)) {
-  throw new TableException("DataExchangeMode.BATCH is not supported now")
+val shuffleMode = requiredExchangeMode match {
+  case None => ShuffleMode.PIPELINED
+  case Some(mode) =>
+mode match {
+  case DataExchangeMode.BATCH => ShuffleMode.BATCH
+  case DataExchangeMode.PIPELINED => ShuffleMode.PIPELINED
 
 Review comment:
   how about PIPELINE_WITH_BATCH_FALLBACK?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] wuchong commented on a change in pull request #8872: [FLINK-12977][table]Port CsvTableSource to api-java-bridge

2019-07-01 Thread GitBox
wuchong commented on a change in pull request #8872: [FLINK-12977][table]Port 
CsvTableSource to api-java-bridge
URL: https://github.com/apache/flink/pull/8872#discussion_r299042764
 
 

 ##
 File path: 
flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sinks/CsvTableSink.java
 ##
 @@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sinks;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.operators.DataSink;
+import org.apache.flink.api.java.operators.MapOperator;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.table.utils.TableConnectorUtils;
+import org.apache.flink.types.Row;
+
+/**
+ * A simple [[TableSink]] to emit data as CSV files.
+ */
+public class CsvTableSink implements BatchTableSink, 
AppendStreamTableSink {
+   private String path;
+   private String fieldDelim;
+   private int numFiles = -1;
+   private FileSystem.WriteMode writeMode;
+
+   private String[] fieldNames;
+   private TypeInformation[] fieldTypes;
+
+   /**
+* A simple [[TableSink]] to emit data as CSV files.
+*
+* @param path   The output path to write the Table to.
+* @param fieldDelim The field delimiter
+* @param numFiles   The number of files to write to
+* @param writeMode  The write mode to specify whether existing files 
are overwritten or not.
+*/
+   public CsvTableSink(
+   String path,
+   String fieldDelim,
+   int numFiles,
+   FileSystem.WriteMode writeMode) {
+   this.path = path;
+   this.fieldDelim = fieldDelim;
+   this.numFiles = numFiles;
+   this.writeMode = writeMode;
+   }
+
+   /**
+* A simple [[TableSink]] to emit data as CSV files using comma as 
field delimiter, with default
+* parallelism and write mode.
+*
+* @param path The output path to write the Table to.
+*/
+   public CsvTableSink(String path) {
+   this(path, ",");
+   }
+
+   /**
+* A simple [[TableSink]] to emit data as CSV files, with default 
parallelism and write mode.
+*
+* @param path   The output path to write the Table to.
+* @param fieldDelim The field delimiter
+*/
+   public CsvTableSink(String path, String fieldDelim) {
+   this(path, fieldDelim, -1, null);
+   }
+
+   @Override
+   public void emitDataSet(DataSet dataSet) {
+   MapOperator csvRows =
+   dataSet.map(new CsvFormatter(fieldDelim == null ? "," : 
fieldDelim));
+
+   DataSink sink;
+   if (writeMode != null) {
+   sink = csvRows.writeAsText(path, writeMode);
+   } else {
+   sink = csvRows.writeAsText(path);
+   }
+
+   if (numFiles > 0) {
+   csvRows.setParallelism(numFiles);
+   sink.setParallelism(numFiles);
+   }
+
+   
sink.name(TableConnectorUtils.generateRuntimeName(CsvTableSink.class, 
fieldNames));
+   }
+
+   @Override
+   @SuppressWarnings("")
+   public void emitDataStream(DataStream dataStream) {
+   SingleOutputStreamOperator csvRows =
+   dataStream.map(new CsvFormatter(fieldDelim == null ? 
"," : fieldDelim));
+
+   DataStreamSink sink;
+   if (writeMode != null) {
+   sink = csvRows.writeAsText(path, writeMode);
+   } else {
+   sink = csvRows.writeAsText(path);
+   }
+
+   if (numFiles > 0) {
+ 

[GitHub] [flink] wuchong commented on a change in pull request #8872: [FLINK-12977][table]Port CsvTableSource to api-java-bridge

2019-07-01 Thread GitBox
wuchong commented on a change in pull request #8872: [FLINK-12977][table]Port 
CsvTableSource to api-java-bridge
URL: https://github.com/apache/flink/pull/8872#discussion_r299291809
 
 

 ##
 File path: 
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/TableSourceTest.scala
 ##
 @@ -354,34 +354,6 @@ class TableSourceTest extends TableTestBase {
 
   // csv builder
 
-  @Test
-  def testCsvTableSourceBuilder(): Unit = {
 
 Review comment:
   I think we should still keep this test to verify builder behavior.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] wuchong commented on a change in pull request #8872: [FLINK-12977][table]Port CsvTableSource to api-java-bridge

2019-07-01 Thread GitBox
wuchong commented on a change in pull request #8872: [FLINK-12977][table]Port 
CsvTableSource to api-java-bridge
URL: https://github.com/apache/flink/pull/8872#discussion_r299288495
 
 

 ##
 File path: 
flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sources/CsvTableSource.java
 ##
 @@ -0,0 +1,524 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sources;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.io.InputFormat;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.CsvInputFormat;
+import org.apache.flink.api.java.io.RowCsvInputFormat;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.functions.AsyncTableFunction;
+import org.apache.flink.table.functions.FunctionContext;
+import org.apache.flink.table.functions.TableFunction;
+import org.apache.flink.types.Row;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.IntStream;
+
+/**
+ * A [[InputFormatTableSource]] and [[LookupableTableSource]] for simple CSV 
files with a
+ * (logically) unlimited number of fields.
+ */
+public class CsvTableSource extends InputFormatTableSource implements
+   LookupableTableSource, ProjectableTableSource, 
BatchTableSource {
+
+   private final CsvInputFormatConfig config;
+
+
+   /**
+* A [[BatchTableSource]] and [[StreamTableSource]] for simple CSV 
files with a
+* (logically) unlimited number of fields.
+*
+* @param path The path to the CSV file.
+* @param fieldNames The names of the table fields.
+* @param fieldTypes The types of the table fields.
+*/
+   public CsvTableSource(String path, String[] fieldNames, 
TypeInformation[] fieldTypes) {
+   this(path, fieldNames, fieldTypes,
+   IntStream.range(0, fieldNames.length).toArray(),
+   CsvInputFormat.DEFAULT_FIELD_DELIMITER, 
CsvInputFormat.DEFAULT_LINE_DELIMITER,
+   null, false, null, false);
+   }
+
+   /**
+* A [[BatchTableSource]] and [[StreamTableSource]] for simple CSV 
files with a
+* (logically) unlimited number of fields.
+*
+* @param path The path to the CSV file.
+* @param fieldNames The names of the table fields.
+* @param fieldTypes The types of the table fields.
+* @param fieldDelim The field delimiter, "," by default.
+* @param lineDelim The row delimiter, "\n" by default.
+* @param quoteCharacter An optional quote character for String values, 
null by default.
+* @param ignoreFirstLine Flag to ignore the first line, false by 
default.
+* @param ignoreComments An optional prefix to indicate comments, null 
by default.
+* @param lenient Flag to skip records with parse error instead to 
fail, false by default.
+*/
+   public CsvTableSource(
+   String path,
+   String[] fieldNames,
+   TypeInformation[] fieldTypes,
+   String fieldDelim,
+   String lineDelim,
+   Character quoteCharacter,
+   boolean ignoreFirstLine,
+   String ignoreComments,
+   boolean lenient) {
+
+   this(path, fieldNames, fieldTypes,
+   IntStream.range(0, fieldNames.length).toArray(),
+   fieldDelim, lineDelim,
+   quoteCharacter, ignoreFirstLine, ignoreComments, 
lenient);
+   }
+
+   /**
+* A [[BatchTableSource]] and [[StreamTableSource]] for simple CSV 
files with a
+* (logically) unlimited number of

[GitHub] [flink] wuchong commented on a change in pull request #8872: [FLINK-12977][table]Port CsvTableSource to api-java-bridge

2019-07-01 Thread GitBox
wuchong commented on a change in pull request #8872: [FLINK-12977][table]Port 
CsvTableSource to api-java-bridge
URL: https://github.com/apache/flink/pull/8872#discussion_r299039477
 
 

 ##
 File path: 
flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sinks/CsvTableSink.java
 ##
 @@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sinks;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.operators.DataSink;
+import org.apache.flink.api.java.operators.MapOperator;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.table.utils.TableConnectorUtils;
+import org.apache.flink.types.Row;
+
+/**
+ * A simple [[TableSink]] to emit data as CSV files.
+ */
+public class CsvTableSink implements BatchTableSink, 
AppendStreamTableSink {
+   private String path;
+   private String fieldDelim;
+   private int numFiles = -1;
+   private FileSystem.WriteMode writeMode;
+
+   private String[] fieldNames;
+   private TypeInformation[] fieldTypes;
+
+   /**
+* A simple [[TableSink]] to emit data as CSV files.
+*
+* @param path   The output path to write the Table to.
+* @param fieldDelim The field delimiter
+* @param numFiles   The number of files to write to
+* @param writeMode  The write mode to specify whether existing files 
are overwritten or not.
+*/
+   public CsvTableSink(
+   String path,
+   String fieldDelim,
+   int numFiles,
+   FileSystem.WriteMode writeMode) {
+   this.path = path;
+   this.fieldDelim = fieldDelim;
+   this.numFiles = numFiles;
+   this.writeMode = writeMode;
+   }
+
+   /**
+* A simple [[TableSink]] to emit data as CSV files using comma as 
field delimiter, with default
+* parallelism and write mode.
+*
+* @param path The output path to write the Table to.
+*/
+   public CsvTableSink(String path) {
+   this(path, ",");
+   }
+
+   /**
+* A simple [[TableSink]] to emit data as CSV files, with default 
parallelism and write mode.
+*
+* @param path   The output path to write the Table to.
+* @param fieldDelim The field delimiter
+*/
+   public CsvTableSink(String path, String fieldDelim) {
+   this(path, fieldDelim, -1, null);
+   }
+
+   @Override
+   public void emitDataSet(DataSet dataSet) {
+   MapOperator csvRows =
+   dataSet.map(new CsvFormatter(fieldDelim == null ? "," : 
fieldDelim));
+
+   DataSink sink;
+   if (writeMode != null) {
+   sink = csvRows.writeAsText(path, writeMode);
+   } else {
+   sink = csvRows.writeAsText(path);
+   }
+
+   if (numFiles > 0) {
+   csvRows.setParallelism(numFiles);
+   sink.setParallelism(numFiles);
+   }
+
+   
sink.name(TableConnectorUtils.generateRuntimeName(CsvTableSink.class, 
fieldNames));
+   }
+
+   @Override
+   @SuppressWarnings("")
 
 Review comment:
   Unnecessary? Remove this?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] wuchong commented on a change in pull request #8872: [FLINK-12977][table]Port CsvTableSource to api-java-bridge

2019-07-01 Thread GitBox
wuchong commented on a change in pull request #8872: [FLINK-12977][table]Port 
CsvTableSource to api-java-bridge
URL: https://github.com/apache/flink/pull/8872#discussion_r299042550
 
 

 ##
 File path: 
flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sinks/CsvTableSink.java
 ##
 @@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sinks;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.operators.DataSink;
+import org.apache.flink.api.java.operators.MapOperator;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.table.utils.TableConnectorUtils;
+import org.apache.flink.types.Row;
+
+/**
+ * A simple [[TableSink]] to emit data as CSV files.
+ */
+public class CsvTableSink implements BatchTableSink, 
AppendStreamTableSink {
+   private String path;
+   private String fieldDelim;
+   private int numFiles = -1;
+   private FileSystem.WriteMode writeMode;
+
+   private String[] fieldNames;
+   private TypeInformation[] fieldTypes;
+
+   /**
+* A simple [[TableSink]] to emit data as CSV files.
+*
+* @param path   The output path to write the Table to.
+* @param fieldDelim The field delimiter
+* @param numFiles   The number of files to write to
+* @param writeMode  The write mode to specify whether existing files 
are overwritten or not.
+*/
+   public CsvTableSink(
+   String path,
+   String fieldDelim,
+   int numFiles,
+   FileSystem.WriteMode writeMode) {
+   this.path = path;
+   this.fieldDelim = fieldDelim;
+   this.numFiles = numFiles;
+   this.writeMode = writeMode;
+   }
+
+   /**
+* A simple [[TableSink]] to emit data as CSV files using comma as 
field delimiter, with default
+* parallelism and write mode.
+*
+* @param path The output path to write the Table to.
+*/
+   public CsvTableSink(String path) {
+   this(path, ",");
+   }
+
+   /**
+* A simple [[TableSink]] to emit data as CSV files, with default 
parallelism and write mode.
+*
+* @param path   The output path to write the Table to.
+* @param fieldDelim The field delimiter
+*/
+   public CsvTableSink(String path, String fieldDelim) {
+   this(path, fieldDelim, -1, null);
+   }
+
+   @Override
+   public void emitDataSet(DataSet dataSet) {
+   MapOperator csvRows =
+   dataSet.map(new CsvFormatter(fieldDelim == null ? "," : 
fieldDelim));
+
+   DataSink sink;
+   if (writeMode != null) {
+   sink = csvRows.writeAsText(path, writeMode);
+   } else {
+   sink = csvRows.writeAsText(path);
+   }
+
+   if (numFiles > 0) {
+   csvRows.setParallelism(numFiles);
+   sink.setParallelism(numFiles);
+   }
+
+   
sink.name(TableConnectorUtils.generateRuntimeName(CsvTableSink.class, 
fieldNames));
+   }
+
+   @Override
+   @SuppressWarnings("")
+   public void emitDataStream(DataStream dataStream) {
+   SingleOutputStreamOperator csvRows =
+   dataStream.map(new CsvFormatter(fieldDelim == null ? 
"," : fieldDelim));
+
+   DataStreamSink sink;
+   if (writeMode != null) {
+   sink = csvRows.writeAsText(path, writeMode);
+   } else {
+   sink = csvRows.writeAsText(path);
+   }
+
+   if (numFiles > 0) {
+ 

[GitHub] [flink] wuchong commented on a change in pull request #8872: [FLINK-12977][table]Port CsvTableSource to api-java-bridge

2019-07-01 Thread GitBox
wuchong commented on a change in pull request #8872: [FLINK-12977][table]Port 
CsvTableSource to api-java-bridge
URL: https://github.com/apache/flink/pull/8872#discussion_r299291487
 
 

 ##
 File path: 
flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sources/CsvTableSource.java
 ##
 @@ -0,0 +1,524 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sources;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.io.InputFormat;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.CsvInputFormat;
+import org.apache.flink.api.java.io.RowCsvInputFormat;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.functions.AsyncTableFunction;
+import org.apache.flink.table.functions.FunctionContext;
+import org.apache.flink.table.functions.TableFunction;
+import org.apache.flink.types.Row;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.IntStream;
+
+/**
+ * A [[InputFormatTableSource]] and [[LookupableTableSource]] for simple CSV 
files with a
+ * (logically) unlimited number of fields.
+ */
+public class CsvTableSource extends InputFormatTableSource implements
+   LookupableTableSource, ProjectableTableSource, 
BatchTableSource {
+
+   private final CsvInputFormatConfig config;
+
+
+   /**
+* A [[BatchTableSource]] and [[StreamTableSource]] for simple CSV 
files with a
+* (logically) unlimited number of fields.
+*
+* @param path The path to the CSV file.
+* @param fieldNames The names of the table fields.
+* @param fieldTypes The types of the table fields.
+*/
+   public CsvTableSource(String path, String[] fieldNames, 
TypeInformation[] fieldTypes) {
+   this(path, fieldNames, fieldTypes,
+   IntStream.range(0, fieldNames.length).toArray(),
+   CsvInputFormat.DEFAULT_FIELD_DELIMITER, 
CsvInputFormat.DEFAULT_LINE_DELIMITER,
+   null, false, null, false);
+   }
+
+   /**
+* A [[BatchTableSource]] and [[StreamTableSource]] for simple CSV 
files with a
+* (logically) unlimited number of fields.
+*
+* @param path The path to the CSV file.
+* @param fieldNames The names of the table fields.
+* @param fieldTypes The types of the table fields.
+* @param fieldDelim The field delimiter, "," by default.
+* @param lineDelim The row delimiter, "\n" by default.
+* @param quoteCharacter An optional quote character for String values, 
null by default.
+* @param ignoreFirstLine Flag to ignore the first line, false by 
default.
+* @param ignoreComments An optional prefix to indicate comments, null 
by default.
+* @param lenient Flag to skip records with parse error instead to 
fail, false by default.
+*/
+   public CsvTableSource(
+   String path,
+   String[] fieldNames,
+   TypeInformation[] fieldTypes,
+   String fieldDelim,
+   String lineDelim,
+   Character quoteCharacter,
+   boolean ignoreFirstLine,
+   String ignoreComments,
+   boolean lenient) {
+
+   this(path, fieldNames, fieldTypes,
+   IntStream.range(0, fieldNames.length).toArray(),
+   fieldDelim, lineDelim,
+   quoteCharacter, ignoreFirstLine, ignoreComments, 
lenient);
+   }
+
+   /**
+* A [[BatchTableSource]] and [[StreamTableSource]] for simple CSV 
files with a
+* (logically) unlimited number of

[GitHub] [flink] wuchong commented on a change in pull request #8872: [FLINK-12977][table]Port CsvTableSource to api-java-bridge

2019-07-01 Thread GitBox
wuchong commented on a change in pull request #8872: [FLINK-12977][table]Port 
CsvTableSource to api-java-bridge
URL: https://github.com/apache/flink/pull/8872#discussion_r299288657
 
 

 ##
 File path: 
flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sources/CsvTableSource.java
 ##
 @@ -0,0 +1,524 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sources;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.io.InputFormat;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.CsvInputFormat;
+import org.apache.flink.api.java.io.RowCsvInputFormat;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.functions.AsyncTableFunction;
+import org.apache.flink.table.functions.FunctionContext;
+import org.apache.flink.table.functions.TableFunction;
+import org.apache.flink.types.Row;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.IntStream;
+
+/**
+ * A [[InputFormatTableSource]] and [[LookupableTableSource]] for simple CSV 
files with a
+ * (logically) unlimited number of fields.
+ */
+public class CsvTableSource extends InputFormatTableSource implements
+   LookupableTableSource, ProjectableTableSource, 
BatchTableSource {
+
+   private final CsvInputFormatConfig config;
+
+
+   /**
+* A [[BatchTableSource]] and [[StreamTableSource]] for simple CSV 
files with a
+* (logically) unlimited number of fields.
+*
+* @param path The path to the CSV file.
+* @param fieldNames The names of the table fields.
+* @param fieldTypes The types of the table fields.
+*/
+   public CsvTableSource(String path, String[] fieldNames, 
TypeInformation[] fieldTypes) {
+   this(path, fieldNames, fieldTypes,
+   IntStream.range(0, fieldNames.length).toArray(),
+   CsvInputFormat.DEFAULT_FIELD_DELIMITER, 
CsvInputFormat.DEFAULT_LINE_DELIMITER,
+   null, false, null, false);
+   }
+
+   /**
+* A [[BatchTableSource]] and [[StreamTableSource]] for simple CSV 
files with a
+* (logically) unlimited number of fields.
+*
+* @param path The path to the CSV file.
+* @param fieldNames The names of the table fields.
+* @param fieldTypes The types of the table fields.
+* @param fieldDelim The field delimiter, "," by default.
+* @param lineDelim The row delimiter, "\n" by default.
+* @param quoteCharacter An optional quote character for String values, 
null by default.
+* @param ignoreFirstLine Flag to ignore the first line, false by 
default.
+* @param ignoreComments An optional prefix to indicate comments, null 
by default.
+* @param lenient Flag to skip records with parse error instead to 
fail, false by default.
+*/
+   public CsvTableSource(
+   String path,
+   String[] fieldNames,
+   TypeInformation[] fieldTypes,
+   String fieldDelim,
+   String lineDelim,
+   Character quoteCharacter,
+   boolean ignoreFirstLine,
+   String ignoreComments,
+   boolean lenient) {
+
+   this(path, fieldNames, fieldTypes,
+   IntStream.range(0, fieldNames.length).toArray(),
+   fieldDelim, lineDelim,
+   quoteCharacter, ignoreFirstLine, ignoreComments, 
lenient);
+   }
+
+   /**
+* A [[BatchTableSource]] and [[StreamTableSource]] for simple CSV 
files with a
+* (logically) unlimited number of

[GitHub] [flink] wuchong commented on a change in pull request #8872: [FLINK-12977][table]Port CsvTableSource to api-java-bridge

2019-07-01 Thread GitBox
wuchong commented on a change in pull request #8872: [FLINK-12977][table]Port 
CsvTableSource to api-java-bridge
URL: https://github.com/apache/flink/pull/8872#discussion_r299042199
 
 

 ##
 File path: 
flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sinks/CsvTableSink.java
 ##
 @@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sinks;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.operators.DataSink;
+import org.apache.flink.api.java.operators.MapOperator;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.table.utils.TableConnectorUtils;
+import org.apache.flink.types.Row;
+
+/**
+ * A simple [[TableSink]] to emit data as CSV files.
+ */
+public class CsvTableSink implements BatchTableSink, 
AppendStreamTableSink {
+   private String path;
+   private String fieldDelim;
+   private int numFiles = -1;
+   private FileSystem.WriteMode writeMode;
+
+   private String[] fieldNames;
+   private TypeInformation[] fieldTypes;
+
+   /**
+* A simple [[TableSink]] to emit data as CSV files.
+*
+* @param path   The output path to write the Table to.
+* @param fieldDelim The field delimiter
+* @param numFiles   The number of files to write to
+* @param writeMode  The write mode to specify whether existing files 
are overwritten or not.
+*/
+   public CsvTableSink(
+   String path,
+   String fieldDelim,
+   int numFiles,
+   FileSystem.WriteMode writeMode) {
+   this.path = path;
+   this.fieldDelim = fieldDelim;
+   this.numFiles = numFiles;
+   this.writeMode = writeMode;
+   }
+
+   /**
+* A simple [[TableSink]] to emit data as CSV files using comma as 
field delimiter, with default
+* parallelism and write mode.
+*
+* @param path The output path to write the Table to.
+*/
+   public CsvTableSink(String path) {
+   this(path, ",");
+   }
+
+   /**
+* A simple [[TableSink]] to emit data as CSV files, with default 
parallelism and write mode.
+*
+* @param path   The output path to write the Table to.
+* @param fieldDelim The field delimiter
+*/
+   public CsvTableSink(String path, String fieldDelim) {
+   this(path, fieldDelim, -1, null);
+   }
+
+   @Override
+   public void emitDataSet(DataSet dataSet) {
+   MapOperator csvRows =
+   dataSet.map(new CsvFormatter(fieldDelim == null ? "," : 
fieldDelim));
+
+   DataSink sink;
+   if (writeMode != null) {
+   sink = csvRows.writeAsText(path, writeMode);
+   } else {
+   sink = csvRows.writeAsText(path);
+   }
+
+   if (numFiles > 0) {
+   csvRows.setParallelism(numFiles);
+   sink.setParallelism(numFiles);
+   }
+
+   
sink.name(TableConnectorUtils.generateRuntimeName(CsvTableSink.class, 
fieldNames));
+   }
+
+   @Override
+   @SuppressWarnings("")
+   public void emitDataStream(DataStream dataStream) {
+   SingleOutputStreamOperator csvRows =
+   dataStream.map(new CsvFormatter(fieldDelim == null ? 
"," : fieldDelim));
+
+   DataStreamSink sink;
+   if (writeMode != null) {
+   sink = csvRows.writeAsText(path, writeMode);
+   } else {
+   sink = csvRows.writeAsText(path);
+   }
+
+   if (numFiles > 0) {
+ 

[GitHub] [flink] wuchong commented on a change in pull request #8872: [FLINK-12977][table]Port CsvTableSource to api-java-bridge

2019-07-01 Thread GitBox
wuchong commented on a change in pull request #8872: [FLINK-12977][table]Port 
CsvTableSource to api-java-bridge
URL: https://github.com/apache/flink/pull/8872#discussion_r298953595
 
 

 ##
 File path: 
flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sinks/CsvTableSink.java
 ##
 @@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sinks;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.operators.DataSink;
+import org.apache.flink.api.java.operators.MapOperator;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.table.utils.TableConnectorUtils;
+import org.apache.flink.types.Row;
+
+/**
+ * A simple [[TableSink]] to emit data as CSV files.
+ */
+public class CsvTableSink implements BatchTableSink, 
AppendStreamTableSink {
+   private String path;
+   private String fieldDelim;
+   private int numFiles = -1;
+   private FileSystem.WriteMode writeMode;
+
+   private String[] fieldNames;
+   private TypeInformation[] fieldTypes;
+
+   /**
+* A simple [[TableSink]] to emit data as CSV files.
 
 Review comment:
   Replace `[[TableSink]]` by `{@link TableSink}`


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] wuchong commented on a change in pull request #8872: [FLINK-12977][table]Port CsvTableSource to api-java-bridge

2019-07-01 Thread GitBox
wuchong commented on a change in pull request #8872: [FLINK-12977][table]Port 
CsvTableSource to api-java-bridge
URL: https://github.com/apache/flink/pull/8872#discussion_r299287964
 
 

 ##
 File path: 
flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sources/CsvTableSource.java
 ##
 @@ -0,0 +1,524 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sources;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.io.InputFormat;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.CsvInputFormat;
+import org.apache.flink.api.java.io.RowCsvInputFormat;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.functions.AsyncTableFunction;
+import org.apache.flink.table.functions.FunctionContext;
+import org.apache.flink.table.functions.TableFunction;
+import org.apache.flink.types.Row;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.IntStream;
+
+/**
+ * A [[InputFormatTableSource]] and [[LookupableTableSource]] for simple CSV 
files with a
+ * (logically) unlimited number of fields.
+ */
+public class CsvTableSource extends InputFormatTableSource implements
+   LookupableTableSource, ProjectableTableSource, 
BatchTableSource {
+
+   private final CsvInputFormatConfig config;
+
+
+   /**
+* A [[BatchTableSource]] and [[StreamTableSource]] for simple CSV 
files with a
+* (logically) unlimited number of fields.
+*
+* @param path The path to the CSV file.
+* @param fieldNames The names of the table fields.
+* @param fieldTypes The types of the table fields.
+*/
+   public CsvTableSource(String path, String[] fieldNames, 
TypeInformation[] fieldTypes) {
+   this(path, fieldNames, fieldTypes,
+   IntStream.range(0, fieldNames.length).toArray(),
+   CsvInputFormat.DEFAULT_FIELD_DELIMITER, 
CsvInputFormat.DEFAULT_LINE_DELIMITER,
+   null, false, null, false);
+   }
+
+   /**
+* A [[BatchTableSource]] and [[StreamTableSource]] for simple CSV 
files with a
+* (logically) unlimited number of fields.
+*
+* @param path The path to the CSV file.
+* @param fieldNames The names of the table fields.
+* @param fieldTypes The types of the table fields.
+* @param fieldDelim The field delimiter, "," by default.
+* @param lineDelim The row delimiter, "\n" by default.
+* @param quoteCharacter An optional quote character for String values, 
null by default.
+* @param ignoreFirstLine Flag to ignore the first line, false by 
default.
+* @param ignoreComments An optional prefix to indicate comments, null 
by default.
+* @param lenient Flag to skip records with parse error instead to 
fail, false by default.
+*/
+   public CsvTableSource(
+   String path,
+   String[] fieldNames,
+   TypeInformation[] fieldTypes,
+   String fieldDelim,
+   String lineDelim,
+   Character quoteCharacter,
+   boolean ignoreFirstLine,
+   String ignoreComments,
+   boolean lenient) {
+
+   this(path, fieldNames, fieldTypes,
+   IntStream.range(0, fieldNames.length).toArray(),
+   fieldDelim, lineDelim,
+   quoteCharacter, ignoreFirstLine, ignoreComments, 
lenient);
+   }
+
+   /**
+* A [[BatchTableSource]] and [[StreamTableSource]] for simple CSV 
files with a
+* (logically) unlimited number of

[GitHub] [flink] wuchong commented on a change in pull request #8872: [FLINK-12977][table]Port CsvTableSource to api-java-bridge

2019-07-01 Thread GitBox
wuchong commented on a change in pull request #8872: [FLINK-12977][table]Port 
CsvTableSource to api-java-bridge
URL: https://github.com/apache/flink/pull/8872#discussion_r299282489
 
 

 ##
 File path: 
flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sinks/CsvTableSink.java
 ##
 @@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sinks;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.operators.DataSink;
+import org.apache.flink.api.java.operators.MapOperator;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.table.utils.TableConnectorUtils;
+import org.apache.flink.types.Row;
+
+/**
+ * A simple [[TableSink]] to emit data as CSV files.
+ */
+public class CsvTableSink implements BatchTableSink, 
AppendStreamTableSink {
+   private String path;
+   private String fieldDelim;
+   private int numFiles = -1;
+   private FileSystem.WriteMode writeMode;
+
+   private String[] fieldNames;
+   private TypeInformation[] fieldTypes;
+
+   /**
+* A simple [[TableSink]] to emit data as CSV files.
+*
+* @param path   The output path to write the Table to.
+* @param fieldDelim The field delimiter
+* @param numFiles   The number of files to write to
+* @param writeMode  The write mode to specify whether existing files 
are overwritten or not.
+*/
+   public CsvTableSink(
+   String path,
+   String fieldDelim,
+   int numFiles,
+   FileSystem.WriteMode writeMode) {
+   this.path = path;
+   this.fieldDelim = fieldDelim;
+   this.numFiles = numFiles;
+   this.writeMode = writeMode;
+   }
+
+   /**
+* A simple [[TableSink]] to emit data as CSV files using comma as 
field delimiter, with default
+* parallelism and write mode.
+*
+* @param path The output path to write the Table to.
+*/
+   public CsvTableSink(String path) {
+   this(path, ",");
+   }
+
+   /**
+* A simple [[TableSink]] to emit data as CSV files, with default 
parallelism and write mode.
+*
+* @param path   The output path to write the Table to.
+* @param fieldDelim The field delimiter
+*/
+   public CsvTableSink(String path, String fieldDelim) {
+   this(path, fieldDelim, -1, null);
+   }
+
+   @Override
+   public void emitDataSet(DataSet dataSet) {
+   MapOperator csvRows =
+   dataSet.map(new CsvFormatter(fieldDelim == null ? "," : 
fieldDelim));
+
+   DataSink sink;
+   if (writeMode != null) {
+   sink = csvRows.writeAsText(path, writeMode);
+   } else {
+   sink = csvRows.writeAsText(path);
+   }
+
+   if (numFiles > 0) {
+   csvRows.setParallelism(numFiles);
+   sink.setParallelism(numFiles);
+   }
+
+   
sink.name(TableConnectorUtils.generateRuntimeName(CsvTableSink.class, 
fieldNames));
+   }
+
+   @Override
+   @SuppressWarnings("")
+   public void emitDataStream(DataStream dataStream) {
 
 Review comment:
   Please override `DataStreamSink consumeDataStream(DataStream 
dataStream)` method, it's mandatory to return a `DataStreamSink` if used in 
blink-planner.
   
   The implementation of `emitDataStream` can simply to call 
`consumeDataStream`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL abo

[GitHub] [flink] wuchong commented on a change in pull request #8872: [FLINK-12977][table]Port CsvTableSource to api-java-bridge

2019-07-01 Thread GitBox
wuchong commented on a change in pull request #8872: [FLINK-12977][table]Port 
CsvTableSource to api-java-bridge
URL: https://github.com/apache/flink/pull/8872#discussion_r299290566
 
 

 ##
 File path: 
flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sources/CsvTableSource.java
 ##
 @@ -0,0 +1,524 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sources;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.io.InputFormat;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.CsvInputFormat;
+import org.apache.flink.api.java.io.RowCsvInputFormat;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.functions.AsyncTableFunction;
+import org.apache.flink.table.functions.FunctionContext;
+import org.apache.flink.table.functions.TableFunction;
+import org.apache.flink.types.Row;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.IntStream;
+
+/**
+ * A [[InputFormatTableSource]] and [[LookupableTableSource]] for simple CSV 
files with a
+ * (logically) unlimited number of fields.
+ */
+public class CsvTableSource extends InputFormatTableSource implements
+   LookupableTableSource, ProjectableTableSource, 
BatchTableSource {
+
+   private final CsvInputFormatConfig config;
+
+
+   /**
+* A [[BatchTableSource]] and [[StreamTableSource]] for simple CSV 
files with a
+* (logically) unlimited number of fields.
+*
+* @param path The path to the CSV file.
+* @param fieldNames The names of the table fields.
+* @param fieldTypes The types of the table fields.
+*/
+   public CsvTableSource(String path, String[] fieldNames, 
TypeInformation[] fieldTypes) {
+   this(path, fieldNames, fieldTypes,
+   IntStream.range(0, fieldNames.length).toArray(),
+   CsvInputFormat.DEFAULT_FIELD_DELIMITER, 
CsvInputFormat.DEFAULT_LINE_DELIMITER,
+   null, false, null, false);
+   }
+
+   /**
+* A [[BatchTableSource]] and [[StreamTableSource]] for simple CSV 
files with a
+* (logically) unlimited number of fields.
+*
+* @param path The path to the CSV file.
+* @param fieldNames The names of the table fields.
+* @param fieldTypes The types of the table fields.
+* @param fieldDelim The field delimiter, "," by default.
+* @param lineDelim The row delimiter, "\n" by default.
+* @param quoteCharacter An optional quote character for String values, 
null by default.
+* @param ignoreFirstLine Flag to ignore the first line, false by 
default.
+* @param ignoreComments An optional prefix to indicate comments, null 
by default.
+* @param lenient Flag to skip records with parse error instead to 
fail, false by default.
+*/
+   public CsvTableSource(
+   String path,
+   String[] fieldNames,
+   TypeInformation[] fieldTypes,
+   String fieldDelim,
+   String lineDelim,
+   Character quoteCharacter,
+   boolean ignoreFirstLine,
+   String ignoreComments,
+   boolean lenient) {
+
+   this(path, fieldNames, fieldTypes,
+   IntStream.range(0, fieldNames.length).toArray(),
+   fieldDelim, lineDelim,
+   quoteCharacter, ignoreFirstLine, ignoreComments, 
lenient);
+   }
+
+   /**
+* A [[BatchTableSource]] and [[StreamTableSource]] for simple CSV 
files with a
+* (logically) unlimited number of

[GitHub] [flink] wuchong commented on a change in pull request #8872: [FLINK-12977][table]Port CsvTableSource to api-java-bridge

2019-07-01 Thread GitBox
wuchong commented on a change in pull request #8872: [FLINK-12977][table]Port 
CsvTableSource to api-java-bridge
URL: https://github.com/apache/flink/pull/8872#discussion_r299288200
 
 

 ##
 File path: 
flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sources/CsvTableSource.java
 ##
 @@ -0,0 +1,524 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sources;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.io.InputFormat;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.CsvInputFormat;
+import org.apache.flink.api.java.io.RowCsvInputFormat;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.functions.AsyncTableFunction;
+import org.apache.flink.table.functions.FunctionContext;
+import org.apache.flink.table.functions.TableFunction;
+import org.apache.flink.types.Row;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.IntStream;
+
+/**
+ * A [[InputFormatTableSource]] and [[LookupableTableSource]] for simple CSV 
files with a
+ * (logically) unlimited number of fields.
+ */
+public class CsvTableSource extends InputFormatTableSource implements
 
 Review comment:
   missing `equals(..)` and `hashCode()`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] wuchong commented on a change in pull request #8872: [FLINK-12977][table]Port CsvTableSource to api-java-bridge

2019-07-01 Thread GitBox
wuchong commented on a change in pull request #8872: [FLINK-12977][table]Port 
CsvTableSource to api-java-bridge
URL: https://github.com/apache/flink/pull/8872#discussion_r299296911
 
 

 ##
 File path: flink-python/pyflink/table/sinks.py
 ##
 @@ -49,24 +49,20 @@ class CsvTableSink(TableSink):
 :param write_mode: The write mode to specify whether existing files are 
overwritten or not.
 """
 
-def __init__(self, field_names, field_types, path, field_delimiter=',', 
num_files=1,
+def __init__(self, field_names, field_types, path, field_delimiter=',', 
num_files=-1,
 
 Review comment:
   @dianfu could you have a look at the pyflink changes?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] ifndef-SleePy commented on a change in pull request #8931: [FLINK-13041] Make ScheduleMode configurable via ExecutionConfig

2019-07-01 Thread GitBox
ifndef-SleePy commented on a change in pull request #8931: [FLINK-13041] Make 
ScheduleMode configurable via ExecutionConfig
URL: https://github.com/apache/flink/pull/8931#discussion_r299295760
 
 

 ##
 File path: 
flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
 ##
 @@ -90,6 +90,8 @@
/** Defines how data exchange happens - batch or pipelined */
private ExecutionMode executionMode = ExecutionMode.PIPELINED;
 
+   private ScheduleMode scheduleMode = ScheduleMode.LAZY_FROM_SOURCES;
 
 Review comment:
   Good point!
   It changes default behavior of `DataStream` job, `EAGER` -> 
`LAZY_FROM_SOURCE`.
   Should we move `ScheduleMode` from `ExecutionConfig` to 
`StreamExecutionEnvironment` since `ExecutionConfig` is also used by `DataSet`? 
`DataSet` needs `LAZY_FROM_SOURCE`, `DataStream` needs `EAGER` by default. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] ifndef-SleePy commented on a change in pull request #8931: [FLINK-13041] Make ScheduleMode configurable via ExecutionConfig

2019-07-01 Thread GitBox
ifndef-SleePy commented on a change in pull request #8931: [FLINK-13041] Make 
ScheduleMode configurable via ExecutionConfig
URL: https://github.com/apache/flink/pull/8931#discussion_r299295760
 
 

 ##
 File path: 
flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
 ##
 @@ -90,6 +90,8 @@
/** Defines how data exchange happens - batch or pipelined */
private ExecutionMode executionMode = ExecutionMode.PIPELINED;
 
+   private ScheduleMode scheduleMode = ScheduleMode.LAZY_FROM_SOURCES;
 
 Review comment:
   Good point!
   It changes default behavior of `DataStream` job, `EAGER` -> 
`LAZY_FROM_SOURCE`.
   Should we move `ScheduleMode` from `ExecutionConfig` to 
`StreamExecutionEnvironment` since `ExecutionConfig` is also used by `DataSet`? 
`DataSet` need `LAZY_FROM_SOURCE`, `DataStream` need `EAGER` by default. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on issue #8455: [FLINK-12284][Network, Metrics]Fix the incorrect inputBufferUsage metric in credit-based network mode

2019-07-01 Thread GitBox
zhijiangW commented on issue #8455: [FLINK-12284][Network,Metrics]Fix the 
incorrect inputBufferUsage metric in credit-based network mode
URL: https://github.com/apache/flink/pull/8455#issuecomment-507508370
 
 
   Thanks for the updates @Aitozi ! I left some other comments for tests.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #8455: [FLINK-12284][Network, Metrics]Fix the incorrect inputBufferUsage metric in credit-based network mode

2019-07-01 Thread GitBox
zhijiangW commented on a change in pull request #8455: 
[FLINK-12284][Network,Metrics]Fix the incorrect inputBufferUsage metric in 
credit-based network mode
URL: https://github.com/apache/flink/pull/8455#discussion_r299290395
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputBuffersMetricsTest.java
 ##
 @@ -0,0 +1,316 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.consumer;
+
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.runtime.io.network.NettyShuffleEnvironment;
+import org.apache.flink.runtime.io.network.NettyShuffleEnvironmentBuilder;
+import org.apache.flink.runtime.io.network.TestingConnectionManager;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import 
org.apache.flink.runtime.io.network.metrics.CreditBasedInputBuffersUsageGauge;
+import org.apache.flink.runtime.io.network.metrics.ExclusiveBuffersUsageGauge;
+import org.apache.flink.runtime.io.network.metrics.FloatingBuffersUsageGauge;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests the metrics for input buffers usage.
+ */
+public class InputBuffersMetricsTest extends TestLogger {
+
+   @Test
+   public void testCalculateTotalBuffersSize() throws IOException {
+   int numberOfRemoteChannels = 2;
+   int numberOfLocalChannels = 0;
+
+   int numberOfBufferPerChannel = 2;
+   int numberOfBuffersPerGate = 8;
+
+   NettyShuffleEnvironment network = new 
NettyShuffleEnvironmentBuilder()
+   .setNetworkBuffersPerChannel(numberOfBufferPerChannel)
+   
.setFloatingNetworkBuffersPerGate(numberOfBuffersPerGate)
+   .build();
+
+   Tuple3, 
List> tuple1 = buildInputGate(
+   network,
+   numberOfRemoteChannels,
+   numberOfLocalChannels);
+
+   SingleInputGate inputGate1 = tuple1.f0;
+
+   SingleInputGate[] inputGates = new 
SingleInputGate[]{inputGate1};
+   FloatingBuffersUsageGauge floatingBuffersUsageGauge = new 
FloatingBuffersUsageGauge(inputGates);
+   ExclusiveBuffersUsageGauge exclusiveBuffersUsageGauge = new 
ExclusiveBuffersUsageGauge(inputGates);
+   CreditBasedInputBuffersUsageGauge inputBufferPoolUsageGauge = 
new CreditBasedInputBuffersUsageGauge(
+   floatingBuffersUsageGauge,
+   exclusiveBuffersUsageGauge,
+   inputGates);
+
+   try (CloseableRegistry closeableRegistry = new 
CloseableRegistry()) {
+
+   closeableRegistry.registerCloseable(network::close);
+   closeableRegistry.registerCloseable(inputGate1::close);
+
+   assertEquals(numberOfBuffersPerGate, 
floatingBuffersUsageGauge.calculateTotalBuffers(inputGate1));
+   assertEquals(numberOfRemoteChannels * 
numberOfBufferPerChannel, 
exclusiveBuffersUsageGauge.calculateTotalBuffers(inputGate1));
+   assertEquals(numberOfRemoteChannels * 
numberOfBufferPerChannel + numberOfBuffersPerGate, 
inputBufferPoolUsageGauge.calculateTotalBuffers(inputGate1));
+   }
+   }
+
+   @Test
+   public void testExclusiveBuffersUsage() throws IOException {
+   int numberOfRemoteChannelsGate1 = 2;
+   int numberOfLocalChannelsGate1 = 0;
+   int numberOfRemoteChannelsGate2 = 1;
+   int numberOfLocalChannelsGate2 = 1;
+
+   int numberOfRemoteChannelsTotal = numberOfRemoteChannelsGate1 + 
numberOfRemoteChannelsGate2;
+   int numberOfInputGates = 2;
+
+   int buffersPerChannel = 2;
+   int extraNetworkBuffersPerGate = 8;
+
+   Closea

[GitHub] [flink] zhijiangW commented on a change in pull request #8455: [FLINK-12284][Network, Metrics]Fix the incorrect inputBufferUsage metric in credit-based network mode

2019-07-01 Thread GitBox
zhijiangW commented on a change in pull request #8455: 
[FLINK-12284][Network,Metrics]Fix the incorrect inputBufferUsage metric in 
credit-based network mode
URL: https://github.com/apache/flink/pull/8455#discussion_r299293975
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputBuffersMetricsTest.java
 ##
 @@ -0,0 +1,316 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.consumer;
+
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.runtime.io.network.NettyShuffleEnvironment;
+import org.apache.flink.runtime.io.network.NettyShuffleEnvironmentBuilder;
+import org.apache.flink.runtime.io.network.TestingConnectionManager;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import 
org.apache.flink.runtime.io.network.metrics.CreditBasedInputBuffersUsageGauge;
+import org.apache.flink.runtime.io.network.metrics.ExclusiveBuffersUsageGauge;
+import org.apache.flink.runtime.io.network.metrics.FloatingBuffersUsageGauge;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests the metrics for input buffers usage.
+ */
+public class InputBuffersMetricsTest extends TestLogger {
+
+   @Test
+   public void testCalculateTotalBuffersSize() throws IOException {
+   int numberOfRemoteChannels = 2;
+   int numberOfLocalChannels = 0;
+
+   int numberOfBufferPerChannel = 2;
+   int numberOfBuffersPerGate = 8;
+
+   NettyShuffleEnvironment network = new 
NettyShuffleEnvironmentBuilder()
+   .setNetworkBuffersPerChannel(numberOfBufferPerChannel)
+   
.setFloatingNetworkBuffersPerGate(numberOfBuffersPerGate)
+   .build();
+
+   Tuple3, 
List> tuple1 = buildInputGate(
+   network,
+   numberOfRemoteChannels,
+   numberOfLocalChannels);
+
+   SingleInputGate inputGate1 = tuple1.f0;
+
+   SingleInputGate[] inputGates = new 
SingleInputGate[]{inputGate1};
+   FloatingBuffersUsageGauge floatingBuffersUsageGauge = new 
FloatingBuffersUsageGauge(inputGates);
+   ExclusiveBuffersUsageGauge exclusiveBuffersUsageGauge = new 
ExclusiveBuffersUsageGauge(inputGates);
+   CreditBasedInputBuffersUsageGauge inputBufferPoolUsageGauge = 
new CreditBasedInputBuffersUsageGauge(
+   floatingBuffersUsageGauge,
+   exclusiveBuffersUsageGauge,
+   inputGates);
+
+   try (CloseableRegistry closeableRegistry = new 
CloseableRegistry()) {
+
+   closeableRegistry.registerCloseable(network::close);
+   closeableRegistry.registerCloseable(inputGate1::close);
+
+   assertEquals(numberOfBuffersPerGate, 
floatingBuffersUsageGauge.calculateTotalBuffers(inputGate1));
+   assertEquals(numberOfRemoteChannels * 
numberOfBufferPerChannel, 
exclusiveBuffersUsageGauge.calculateTotalBuffers(inputGate1));
+   assertEquals(numberOfRemoteChannels * 
numberOfBufferPerChannel + numberOfBuffersPerGate, 
inputBufferPoolUsageGauge.calculateTotalBuffers(inputGate1));
+   }
+   }
+
+   @Test
+   public void testExclusiveBuffersUsage() throws IOException {
+   int numberOfRemoteChannelsGate1 = 2;
+   int numberOfLocalChannelsGate1 = 0;
+   int numberOfRemoteChannelsGate2 = 1;
+   int numberOfLocalChannelsGate2 = 1;
+
+   int numberOfRemoteChannelsTotal = numberOfRemoteChannelsGate1 + 
numberOfRemoteChannelsGate2;
+   int numberOfInputGates = 2;
+
+   int buffersPerChannel = 2;
+   int extraNetworkBuffersPerGate = 8;
+
+   Closea

[GitHub] [flink] zhijiangW commented on a change in pull request #8455: [FLINK-12284][Network, Metrics]Fix the incorrect inputBufferUsage metric in credit-based network mode

2019-07-01 Thread GitBox
zhijiangW commented on a change in pull request #8455: 
[FLINK-12284][Network,Metrics]Fix the incorrect inputBufferUsage metric in 
credit-based network mode
URL: https://github.com/apache/flink/pull/8455#discussion_r299293436
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputBuffersMetricsTest.java
 ##
 @@ -0,0 +1,316 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.consumer;
+
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.runtime.io.network.NettyShuffleEnvironment;
+import org.apache.flink.runtime.io.network.NettyShuffleEnvironmentBuilder;
+import org.apache.flink.runtime.io.network.TestingConnectionManager;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import 
org.apache.flink.runtime.io.network.metrics.CreditBasedInputBuffersUsageGauge;
+import org.apache.flink.runtime.io.network.metrics.ExclusiveBuffersUsageGauge;
+import org.apache.flink.runtime.io.network.metrics.FloatingBuffersUsageGauge;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests the metrics for input buffers usage.
+ */
+public class InputBuffersMetricsTest extends TestLogger {
+
+   @Test
+   public void testCalculateTotalBuffersSize() throws IOException {
+   int numberOfRemoteChannels = 2;
+   int numberOfLocalChannels = 0;
+
+   int numberOfBufferPerChannel = 2;
+   int numberOfBuffersPerGate = 8;
+
+   NettyShuffleEnvironment network = new 
NettyShuffleEnvironmentBuilder()
+   .setNetworkBuffersPerChannel(numberOfBufferPerChannel)
+   
.setFloatingNetworkBuffersPerGate(numberOfBuffersPerGate)
+   .build();
+
+   Tuple3, 
List> tuple1 = buildInputGate(
+   network,
+   numberOfRemoteChannels,
+   numberOfLocalChannels);
+
+   SingleInputGate inputGate1 = tuple1.f0;
+
+   SingleInputGate[] inputGates = new 
SingleInputGate[]{inputGate1};
+   FloatingBuffersUsageGauge floatingBuffersUsageGauge = new 
FloatingBuffersUsageGauge(inputGates);
+   ExclusiveBuffersUsageGauge exclusiveBuffersUsageGauge = new 
ExclusiveBuffersUsageGauge(inputGates);
+   CreditBasedInputBuffersUsageGauge inputBufferPoolUsageGauge = 
new CreditBasedInputBuffersUsageGauge(
+   floatingBuffersUsageGauge,
+   exclusiveBuffersUsageGauge,
+   inputGates);
+
+   try (CloseableRegistry closeableRegistry = new 
CloseableRegistry()) {
+
+   closeableRegistry.registerCloseable(network::close);
+   closeableRegistry.registerCloseable(inputGate1::close);
+
+   assertEquals(numberOfBuffersPerGate, 
floatingBuffersUsageGauge.calculateTotalBuffers(inputGate1));
+   assertEquals(numberOfRemoteChannels * 
numberOfBufferPerChannel, 
exclusiveBuffersUsageGauge.calculateTotalBuffers(inputGate1));
+   assertEquals(numberOfRemoteChannels * 
numberOfBufferPerChannel + numberOfBuffersPerGate, 
inputBufferPoolUsageGauge.calculateTotalBuffers(inputGate1));
+   }
+   }
+
+   @Test
+   public void testExclusiveBuffersUsage() throws IOException {
+   int numberOfRemoteChannelsGate1 = 2;
+   int numberOfLocalChannelsGate1 = 0;
+   int numberOfRemoteChannelsGate2 = 1;
+   int numberOfLocalChannelsGate2 = 1;
+
+   int numberOfRemoteChannelsTotal = numberOfRemoteChannelsGate1 + 
numberOfRemoteChannelsGate2;
+   int numberOfInputGates = 2;
+
+   int buffersPerChannel = 2;
+   int extraNetworkBuffersPerGate = 8;
+
+   Closea

[jira] [Assigned] (FLINK-13035) LocalStreamEnvironment shall launch actuall task solts

2019-07-01 Thread Wong (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-13035?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wong reassigned FLINK-13035:


Assignee: Wong

> LocalStreamEnvironment shall launch actuall task solts 
> ---
>
> Key: FLINK-13035
> URL: https://issues.apache.org/jira/browse/FLINK-13035
> Project: Flink
>  Issue Type: Wish
>  Components: Runtime / Task
>Affects Versions: 1.9.0
>Reporter: Wong
>Assignee: Wong
>Priority: Trivial
>
> When developing flink jobs, there is some times use different soltgroup to 
> expand threads.But now minicluster use default 
> jobGraph.getMaximumParallelism(), sometimes is less than actual solts,so it 
> can‘’t lanch job if not set TaskManagerOptions.NUM_TASK_SLOTS  . Is this 
> needed?
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] KurtYoung commented on a change in pull request #8832: [FLINK-12937] [table-planner-blink] Introduce join reorder planner rules in blink planner

2019-07-01 Thread GitBox
KurtYoung commented on a change in pull request #8832: [FLINK-12937] 
[table-planner-blink] Introduce join reorder planner rules in blink planner
URL: https://github.com/apache/flink/pull/8832#discussion_r299292745
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/optimize/program/FlinkBatchProgram.scala
 ##
 @@ -135,6 +137,24 @@ object FlinkBatchProgram {
 .build(), "prune empty after predicate push down")
 .build())
 
+// join reorder
+if 
(config.getBoolean(PlannerConfigOptions.SQL_OPTIMIZER_JOIN_REORDER_ENABLED)) {
+  chainedProgram.addLast(
+JOIN_REORDER,
+FlinkGroupProgramBuilder.newBuilder[BatchOptimizeContext]
+  .addProgram(FlinkHepRuleSetProgramBuilder.newBuilder
+.setHepRulesExecutionType(HEP_RULES_EXECUTION_TYPE.RULE_COLLECTION)
+.setHepMatchOrder(HepMatchOrder.BOTTOM_UP)
+.add(FlinkBatchRuleSets.JOIN_REORDER_PERPARE_RULES)
+.build(), "merge join into MultiJoin")
+  .addProgram(FlinkHepRuleSetProgramBuilder.newBuilder
+.setHepRulesExecutionType(HEP_RULES_EXECUTION_TYPE.RULE_SEQUENCE)
+.setHepMatchOrder(HepMatchOrder.BOTTOM_UP)
+.add(FlinkBatchRuleSets.JOIN_REORDER_RULES)
 
 Review comment:
   join reorder rules are applied with hep planner?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #8455: [FLINK-12284][Network, Metrics]Fix the incorrect inputBufferUsage metric in credit-based network mode

2019-07-01 Thread GitBox
zhijiangW commented on a change in pull request #8455: 
[FLINK-12284][Network,Metrics]Fix the incorrect inputBufferUsage metric in 
credit-based network mode
URL: https://github.com/apache/flink/pull/8455#discussion_r299292724
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputBuffersMetricsTest.java
 ##
 @@ -0,0 +1,316 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.consumer;
+
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.runtime.io.network.NettyShuffleEnvironment;
+import org.apache.flink.runtime.io.network.NettyShuffleEnvironmentBuilder;
+import org.apache.flink.runtime.io.network.TestingConnectionManager;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import 
org.apache.flink.runtime.io.network.metrics.CreditBasedInputBuffersUsageGauge;
+import org.apache.flink.runtime.io.network.metrics.ExclusiveBuffersUsageGauge;
+import org.apache.flink.runtime.io.network.metrics.FloatingBuffersUsageGauge;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests the metrics for input buffers usage.
+ */
+public class InputBuffersMetricsTest extends TestLogger {
+
+   @Test
+   public void testCalculateTotalBuffersSize() throws IOException {
+   int numberOfRemoteChannels = 2;
+   int numberOfLocalChannels = 0;
+
+   int numberOfBufferPerChannel = 2;
+   int numberOfBuffersPerGate = 8;
+
+   NettyShuffleEnvironment network = new 
NettyShuffleEnvironmentBuilder()
+   .setNetworkBuffersPerChannel(numberOfBufferPerChannel)
+   
.setFloatingNetworkBuffersPerGate(numberOfBuffersPerGate)
+   .build();
+
+   Tuple3, 
List> tuple1 = buildInputGate(
+   network,
+   numberOfRemoteChannels,
+   numberOfLocalChannels);
+
+   SingleInputGate inputGate1 = tuple1.f0;
+
+   SingleInputGate[] inputGates = new 
SingleInputGate[]{inputGate1};
+   FloatingBuffersUsageGauge floatingBuffersUsageGauge = new 
FloatingBuffersUsageGauge(inputGates);
+   ExclusiveBuffersUsageGauge exclusiveBuffersUsageGauge = new 
ExclusiveBuffersUsageGauge(inputGates);
+   CreditBasedInputBuffersUsageGauge inputBufferPoolUsageGauge = 
new CreditBasedInputBuffersUsageGauge(
+   floatingBuffersUsageGauge,
+   exclusiveBuffersUsageGauge,
+   inputGates);
+
+   try (CloseableRegistry closeableRegistry = new 
CloseableRegistry()) {
+
+   closeableRegistry.registerCloseable(network::close);
+   closeableRegistry.registerCloseable(inputGate1::close);
+
+   assertEquals(numberOfBuffersPerGate, 
floatingBuffersUsageGauge.calculateTotalBuffers(inputGate1));
+   assertEquals(numberOfRemoteChannels * 
numberOfBufferPerChannel, 
exclusiveBuffersUsageGauge.calculateTotalBuffers(inputGate1));
+   assertEquals(numberOfRemoteChannels * 
numberOfBufferPerChannel + numberOfBuffersPerGate, 
inputBufferPoolUsageGauge.calculateTotalBuffers(inputGate1));
+   }
+   }
+
+   @Test
+   public void testExclusiveBuffersUsage() throws IOException {
+   int numberOfRemoteChannelsGate1 = 2;
+   int numberOfLocalChannelsGate1 = 0;
+   int numberOfRemoteChannelsGate2 = 1;
+   int numberOfLocalChannelsGate2 = 1;
+
+   int numberOfRemoteChannelsTotal = numberOfRemoteChannelsGate1 + 
numberOfRemoteChannelsGate2;
+   int numberOfInputGates = 2;
+
+   int buffersPerChannel = 2;
+   int extraNetworkBuffersPerGate = 8;
+
+   Closea

[GitHub] [flink] zhijiangW commented on a change in pull request #8455: [FLINK-12284][Network, Metrics]Fix the incorrect inputBufferUsage metric in credit-based network mode

2019-07-01 Thread GitBox
zhijiangW commented on a change in pull request #8455: 
[FLINK-12284][Network,Metrics]Fix the incorrect inputBufferUsage metric in 
credit-based network mode
URL: https://github.com/apache/flink/pull/8455#discussion_r299292571
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputBuffersMetricsTest.java
 ##
 @@ -0,0 +1,316 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.consumer;
+
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.runtime.io.network.NettyShuffleEnvironment;
+import org.apache.flink.runtime.io.network.NettyShuffleEnvironmentBuilder;
+import org.apache.flink.runtime.io.network.TestingConnectionManager;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import 
org.apache.flink.runtime.io.network.metrics.CreditBasedInputBuffersUsageGauge;
+import org.apache.flink.runtime.io.network.metrics.ExclusiveBuffersUsageGauge;
+import org.apache.flink.runtime.io.network.metrics.FloatingBuffersUsageGauge;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests the metrics for input buffers usage.
+ */
+public class InputBuffersMetricsTest extends TestLogger {
+
+   @Test
+   public void testCalculateTotalBuffersSize() throws IOException {
+   int numberOfRemoteChannels = 2;
+   int numberOfLocalChannels = 0;
+
+   int numberOfBufferPerChannel = 2;
+   int numberOfBuffersPerGate = 8;
+
+   NettyShuffleEnvironment network = new 
NettyShuffleEnvironmentBuilder()
+   .setNetworkBuffersPerChannel(numberOfBufferPerChannel)
+   
.setFloatingNetworkBuffersPerGate(numberOfBuffersPerGate)
+   .build();
+
+   Tuple3, 
List> tuple1 = buildInputGate(
+   network,
+   numberOfRemoteChannels,
+   numberOfLocalChannels);
+
+   SingleInputGate inputGate1 = tuple1.f0;
+
+   SingleInputGate[] inputGates = new 
SingleInputGate[]{inputGate1};
+   FloatingBuffersUsageGauge floatingBuffersUsageGauge = new 
FloatingBuffersUsageGauge(inputGates);
+   ExclusiveBuffersUsageGauge exclusiveBuffersUsageGauge = new 
ExclusiveBuffersUsageGauge(inputGates);
+   CreditBasedInputBuffersUsageGauge inputBufferPoolUsageGauge = 
new CreditBasedInputBuffersUsageGauge(
+   floatingBuffersUsageGauge,
+   exclusiveBuffersUsageGauge,
+   inputGates);
+
+   try (CloseableRegistry closeableRegistry = new 
CloseableRegistry()) {
+
+   closeableRegistry.registerCloseable(network::close);
+   closeableRegistry.registerCloseable(inputGate1::close);
+
+   assertEquals(numberOfBuffersPerGate, 
floatingBuffersUsageGauge.calculateTotalBuffers(inputGate1));
+   assertEquals(numberOfRemoteChannels * 
numberOfBufferPerChannel, 
exclusiveBuffersUsageGauge.calculateTotalBuffers(inputGate1));
+   assertEquals(numberOfRemoteChannels * 
numberOfBufferPerChannel + numberOfBuffersPerGate, 
inputBufferPoolUsageGauge.calculateTotalBuffers(inputGate1));
+   }
+   }
+
+   @Test
+   public void testExclusiveBuffersUsage() throws IOException {
+   int numberOfRemoteChannelsGate1 = 2;
+   int numberOfLocalChannelsGate1 = 0;
+   int numberOfRemoteChannelsGate2 = 1;
+   int numberOfLocalChannelsGate2 = 1;
+
+   int numberOfRemoteChannelsTotal = numberOfRemoteChannelsGate1 + 
numberOfRemoteChannelsGate2;
+   int numberOfInputGates = 2;
+
+   int buffersPerChannel = 2;
+   int extraNetworkBuffersPerGate = 8;
+
+   Closea

[GitHub] [flink] zhijiangW commented on a change in pull request #8455: [FLINK-12284][Network, Metrics]Fix the incorrect inputBufferUsage metric in credit-based network mode

2019-07-01 Thread GitBox
zhijiangW commented on a change in pull request #8455: 
[FLINK-12284][Network,Metrics]Fix the incorrect inputBufferUsage metric in 
credit-based network mode
URL: https://github.com/apache/flink/pull/8455#discussion_r299292261
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputBuffersMetricsTest.java
 ##
 @@ -0,0 +1,316 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.consumer;
+
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.runtime.io.network.NettyShuffleEnvironment;
+import org.apache.flink.runtime.io.network.NettyShuffleEnvironmentBuilder;
+import org.apache.flink.runtime.io.network.TestingConnectionManager;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import 
org.apache.flink.runtime.io.network.metrics.CreditBasedInputBuffersUsageGauge;
+import org.apache.flink.runtime.io.network.metrics.ExclusiveBuffersUsageGauge;
+import org.apache.flink.runtime.io.network.metrics.FloatingBuffersUsageGauge;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests the metrics for input buffers usage.
+ */
+public class InputBuffersMetricsTest extends TestLogger {
+
+   @Test
+   public void testCalculateTotalBuffersSize() throws IOException {
+   int numberOfRemoteChannels = 2;
+   int numberOfLocalChannels = 0;
+
+   int numberOfBufferPerChannel = 2;
+   int numberOfBuffersPerGate = 8;
+
+   NettyShuffleEnvironment network = new 
NettyShuffleEnvironmentBuilder()
+   .setNetworkBuffersPerChannel(numberOfBufferPerChannel)
+   
.setFloatingNetworkBuffersPerGate(numberOfBuffersPerGate)
+   .build();
+
+   Tuple3, 
List> tuple1 = buildInputGate(
+   network,
+   numberOfRemoteChannels,
+   numberOfLocalChannels);
+
+   SingleInputGate inputGate1 = tuple1.f0;
+
+   SingleInputGate[] inputGates = new 
SingleInputGate[]{inputGate1};
+   FloatingBuffersUsageGauge floatingBuffersUsageGauge = new 
FloatingBuffersUsageGauge(inputGates);
+   ExclusiveBuffersUsageGauge exclusiveBuffersUsageGauge = new 
ExclusiveBuffersUsageGauge(inputGates);
+   CreditBasedInputBuffersUsageGauge inputBufferPoolUsageGauge = 
new CreditBasedInputBuffersUsageGauge(
+   floatingBuffersUsageGauge,
+   exclusiveBuffersUsageGauge,
+   inputGates);
+
+   try (CloseableRegistry closeableRegistry = new 
CloseableRegistry()) {
+
+   closeableRegistry.registerCloseable(network::close);
+   closeableRegistry.registerCloseable(inputGate1::close);
+
+   assertEquals(numberOfBuffersPerGate, 
floatingBuffersUsageGauge.calculateTotalBuffers(inputGate1));
+   assertEquals(numberOfRemoteChannels * 
numberOfBufferPerChannel, 
exclusiveBuffersUsageGauge.calculateTotalBuffers(inputGate1));
+   assertEquals(numberOfRemoteChannels * 
numberOfBufferPerChannel + numberOfBuffersPerGate, 
inputBufferPoolUsageGauge.calculateTotalBuffers(inputGate1));
+   }
+   }
+
+   @Test
+   public void testExclusiveBuffersUsage() throws IOException {
+   int numberOfRemoteChannelsGate1 = 2;
+   int numberOfLocalChannelsGate1 = 0;
+   int numberOfRemoteChannelsGate2 = 1;
+   int numberOfLocalChannelsGate2 = 1;
+
+   int numberOfRemoteChannelsTotal = numberOfRemoteChannelsGate1 + 
numberOfRemoteChannelsGate2;
+   int numberOfInputGates = 2;
+
+   int buffersPerChannel = 2;
+   int extraNetworkBuffersPerGate = 8;
+
+   Closea

[GitHub] [flink] zhijiangW commented on a change in pull request #8455: [FLINK-12284][Network, Metrics]Fix the incorrect inputBufferUsage metric in credit-based network mode

2019-07-01 Thread GitBox
zhijiangW commented on a change in pull request #8455: 
[FLINK-12284][Network,Metrics]Fix the incorrect inputBufferUsage metric in 
credit-based network mode
URL: https://github.com/apache/flink/pull/8455#discussion_r299290827
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputBuffersMetricsTest.java
 ##
 @@ -0,0 +1,316 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.consumer;
+
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.runtime.io.network.NettyShuffleEnvironment;
+import org.apache.flink.runtime.io.network.NettyShuffleEnvironmentBuilder;
+import org.apache.flink.runtime.io.network.TestingConnectionManager;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import 
org.apache.flink.runtime.io.network.metrics.CreditBasedInputBuffersUsageGauge;
+import org.apache.flink.runtime.io.network.metrics.ExclusiveBuffersUsageGauge;
+import org.apache.flink.runtime.io.network.metrics.FloatingBuffersUsageGauge;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests the metrics for input buffers usage.
+ */
+public class InputBuffersMetricsTest extends TestLogger {
+
+   @Test
+   public void testCalculateTotalBuffersSize() throws IOException {
+   int numberOfRemoteChannels = 2;
+   int numberOfLocalChannels = 0;
+
+   int numberOfBufferPerChannel = 2;
+   int numberOfBuffersPerGate = 8;
+
+   NettyShuffleEnvironment network = new 
NettyShuffleEnvironmentBuilder()
+   .setNetworkBuffersPerChannel(numberOfBufferPerChannel)
+   
.setFloatingNetworkBuffersPerGate(numberOfBuffersPerGate)
+   .build();
+
+   Tuple3, 
List> tuple1 = buildInputGate(
+   network,
+   numberOfRemoteChannels,
+   numberOfLocalChannels);
+
+   SingleInputGate inputGate1 = tuple1.f0;
+
+   SingleInputGate[] inputGates = new 
SingleInputGate[]{inputGate1};
+   FloatingBuffersUsageGauge floatingBuffersUsageGauge = new 
FloatingBuffersUsageGauge(inputGates);
+   ExclusiveBuffersUsageGauge exclusiveBuffersUsageGauge = new 
ExclusiveBuffersUsageGauge(inputGates);
+   CreditBasedInputBuffersUsageGauge inputBufferPoolUsageGauge = 
new CreditBasedInputBuffersUsageGauge(
+   floatingBuffersUsageGauge,
+   exclusiveBuffersUsageGauge,
+   inputGates);
+
+   try (CloseableRegistry closeableRegistry = new 
CloseableRegistry()) {
+
+   closeableRegistry.registerCloseable(network::close);
+   closeableRegistry.registerCloseable(inputGate1::close);
+
+   assertEquals(numberOfBuffersPerGate, 
floatingBuffersUsageGauge.calculateTotalBuffers(inputGate1));
+   assertEquals(numberOfRemoteChannels * 
numberOfBufferPerChannel, 
exclusiveBuffersUsageGauge.calculateTotalBuffers(inputGate1));
+   assertEquals(numberOfRemoteChannels * 
numberOfBufferPerChannel + numberOfBuffersPerGate, 
inputBufferPoolUsageGauge.calculateTotalBuffers(inputGate1));
+   }
+   }
+
+   @Test
+   public void testExclusiveBuffersUsage() throws IOException {
+   int numberOfRemoteChannelsGate1 = 2;
+   int numberOfLocalChannelsGate1 = 0;
+   int numberOfRemoteChannelsGate2 = 1;
+   int numberOfLocalChannelsGate2 = 1;
+
+   int numberOfRemoteChannelsTotal = numberOfRemoteChannelsGate1 + 
numberOfRemoteChannelsGate2;
+   int numberOfInputGates = 2;
+
+   int buffersPerChannel = 2;
+   int extraNetworkBuffersPerGate = 8;
+
+   Closea

[GitHub] [flink] zhuzhurk commented on a change in pull request #8931: [FLINK-13041] Make ScheduleMode configurable via ExecutionConfig

2019-07-01 Thread GitBox
zhuzhurk commented on a change in pull request #8931: [FLINK-13041] Make 
ScheduleMode configurable via ExecutionConfig
URL: https://github.com/apache/flink/pull/8931#discussion_r299290712
 
 

 ##
 File path: 
flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
 ##
 @@ -90,6 +90,8 @@
/** Defines how data exchange happens - batch or pipelined */
private ExecutionMode executionMode = ExecutionMode.PIPELINED;
 
+   private ScheduleMode scheduleMode = ScheduleMode.LAZY_FROM_SOURCES;
 
 Review comment:
   Does this mean if users do not specify the `ScheduleMode `, even stream jobs 
will be executed in batch style? 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #8455: [FLINK-12284][Network, Metrics]Fix the incorrect inputBufferUsage metric in credit-based network mode

2019-07-01 Thread GitBox
zhijiangW commented on a change in pull request #8455: 
[FLINK-12284][Network,Metrics]Fix the incorrect inputBufferUsage metric in 
credit-based network mode
URL: https://github.com/apache/flink/pull/8455#discussion_r299290642
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputBuffersMetricsTest.java
 ##
 @@ -0,0 +1,316 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.consumer;
+
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.runtime.io.network.NettyShuffleEnvironment;
+import org.apache.flink.runtime.io.network.NettyShuffleEnvironmentBuilder;
+import org.apache.flink.runtime.io.network.TestingConnectionManager;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import 
org.apache.flink.runtime.io.network.metrics.CreditBasedInputBuffersUsageGauge;
+import org.apache.flink.runtime.io.network.metrics.ExclusiveBuffersUsageGauge;
+import org.apache.flink.runtime.io.network.metrics.FloatingBuffersUsageGauge;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests the metrics for input buffers usage.
+ */
+public class InputBuffersMetricsTest extends TestLogger {
+
+   @Test
+   public void testCalculateTotalBuffersSize() throws IOException {
+   int numberOfRemoteChannels = 2;
+   int numberOfLocalChannels = 0;
+
+   int numberOfBufferPerChannel = 2;
+   int numberOfBuffersPerGate = 8;
+
+   NettyShuffleEnvironment network = new 
NettyShuffleEnvironmentBuilder()
+   .setNetworkBuffersPerChannel(numberOfBufferPerChannel)
+   
.setFloatingNetworkBuffersPerGate(numberOfBuffersPerGate)
+   .build();
+
+   Tuple3, 
List> tuple1 = buildInputGate(
+   network,
+   numberOfRemoteChannels,
+   numberOfLocalChannels);
+
+   SingleInputGate inputGate1 = tuple1.f0;
+
+   SingleInputGate[] inputGates = new 
SingleInputGate[]{inputGate1};
+   FloatingBuffersUsageGauge floatingBuffersUsageGauge = new 
FloatingBuffersUsageGauge(inputGates);
+   ExclusiveBuffersUsageGauge exclusiveBuffersUsageGauge = new 
ExclusiveBuffersUsageGauge(inputGates);
+   CreditBasedInputBuffersUsageGauge inputBufferPoolUsageGauge = 
new CreditBasedInputBuffersUsageGauge(
+   floatingBuffersUsageGauge,
+   exclusiveBuffersUsageGauge,
+   inputGates);
+
+   try (CloseableRegistry closeableRegistry = new 
CloseableRegistry()) {
+
+   closeableRegistry.registerCloseable(network::close);
+   closeableRegistry.registerCloseable(inputGate1::close);
+
+   assertEquals(numberOfBuffersPerGate, 
floatingBuffersUsageGauge.calculateTotalBuffers(inputGate1));
+   assertEquals(numberOfRemoteChannels * 
numberOfBufferPerChannel, 
exclusiveBuffersUsageGauge.calculateTotalBuffers(inputGate1));
+   assertEquals(numberOfRemoteChannels * 
numberOfBufferPerChannel + numberOfBuffersPerGate, 
inputBufferPoolUsageGauge.calculateTotalBuffers(inputGate1));
+   }
+   }
+
+   @Test
+   public void testExclusiveBuffersUsage() throws IOException {
+   int numberOfRemoteChannelsGate1 = 2;
+   int numberOfLocalChannelsGate1 = 0;
+   int numberOfRemoteChannelsGate2 = 1;
+   int numberOfLocalChannelsGate2 = 1;
+
+   int numberOfRemoteChannelsTotal = numberOfRemoteChannelsGate1 + 
numberOfRemoteChannelsGate2;
+   int numberOfInputGates = 2;
+
+   int buffersPerChannel = 2;
+   int extraNetworkBuffersPerGate = 8;
+
+   Closea

[GitHub] [flink] zhijiangW commented on a change in pull request #8455: [FLINK-12284][Network, Metrics]Fix the incorrect inputBufferUsage metric in credit-based network mode

2019-07-01 Thread GitBox
zhijiangW commented on a change in pull request #8455: 
[FLINK-12284][Network,Metrics]Fix the incorrect inputBufferUsage metric in 
credit-based network mode
URL: https://github.com/apache/flink/pull/8455#discussion_r299290478
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputBuffersMetricsTest.java
 ##
 @@ -0,0 +1,316 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.consumer;
+
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.runtime.io.network.NettyShuffleEnvironment;
+import org.apache.flink.runtime.io.network.NettyShuffleEnvironmentBuilder;
+import org.apache.flink.runtime.io.network.TestingConnectionManager;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import 
org.apache.flink.runtime.io.network.metrics.CreditBasedInputBuffersUsageGauge;
+import org.apache.flink.runtime.io.network.metrics.ExclusiveBuffersUsageGauge;
+import org.apache.flink.runtime.io.network.metrics.FloatingBuffersUsageGauge;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests the metrics for input buffers usage.
+ */
+public class InputBuffersMetricsTest extends TestLogger {
+
+   @Test
+   public void testCalculateTotalBuffersSize() throws IOException {
+   int numberOfRemoteChannels = 2;
+   int numberOfLocalChannels = 0;
+
+   int numberOfBufferPerChannel = 2;
+   int numberOfBuffersPerGate = 8;
+
+   NettyShuffleEnvironment network = new 
NettyShuffleEnvironmentBuilder()
+   .setNetworkBuffersPerChannel(numberOfBufferPerChannel)
+   
.setFloatingNetworkBuffersPerGate(numberOfBuffersPerGate)
+   .build();
+
+   Tuple3, 
List> tuple1 = buildInputGate(
+   network,
+   numberOfRemoteChannels,
+   numberOfLocalChannels);
+
+   SingleInputGate inputGate1 = tuple1.f0;
+
+   SingleInputGate[] inputGates = new 
SingleInputGate[]{inputGate1};
+   FloatingBuffersUsageGauge floatingBuffersUsageGauge = new 
FloatingBuffersUsageGauge(inputGates);
+   ExclusiveBuffersUsageGauge exclusiveBuffersUsageGauge = new 
ExclusiveBuffersUsageGauge(inputGates);
+   CreditBasedInputBuffersUsageGauge inputBufferPoolUsageGauge = 
new CreditBasedInputBuffersUsageGauge(
+   floatingBuffersUsageGauge,
+   exclusiveBuffersUsageGauge,
+   inputGates);
+
+   try (CloseableRegistry closeableRegistry = new 
CloseableRegistry()) {
+
+   closeableRegistry.registerCloseable(network::close);
+   closeableRegistry.registerCloseable(inputGate1::close);
+
+   assertEquals(numberOfBuffersPerGate, 
floatingBuffersUsageGauge.calculateTotalBuffers(inputGate1));
+   assertEquals(numberOfRemoteChannels * 
numberOfBufferPerChannel, 
exclusiveBuffersUsageGauge.calculateTotalBuffers(inputGate1));
+   assertEquals(numberOfRemoteChannels * 
numberOfBufferPerChannel + numberOfBuffersPerGate, 
inputBufferPoolUsageGauge.calculateTotalBuffers(inputGate1));
+   }
+   }
+
+   @Test
+   public void testExclusiveBuffersUsage() throws IOException {
+   int numberOfRemoteChannelsGate1 = 2;
+   int numberOfLocalChannelsGate1 = 0;
+   int numberOfRemoteChannelsGate2 = 1;
+   int numberOfLocalChannelsGate2 = 1;
+
+   int numberOfRemoteChannelsTotal = numberOfRemoteChannelsGate1 + 
numberOfRemoteChannelsGate2;
+   int numberOfInputGates = 2;
+
+   int buffersPerChannel = 2;
+   int extraNetworkBuffersPerGate = 8;
+
+   Closea

[GitHub] [flink] zhijiangW commented on a change in pull request #8455: [FLINK-12284][Network, Metrics]Fix the incorrect inputBufferUsage metric in credit-based network mode

2019-07-01 Thread GitBox
zhijiangW commented on a change in pull request #8455: 
[FLINK-12284][Network,Metrics]Fix the incorrect inputBufferUsage metric in 
credit-based network mode
URL: https://github.com/apache/flink/pull/8455#discussion_r299290395
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputBuffersMetricsTest.java
 ##
 @@ -0,0 +1,316 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.consumer;
+
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.runtime.io.network.NettyShuffleEnvironment;
+import org.apache.flink.runtime.io.network.NettyShuffleEnvironmentBuilder;
+import org.apache.flink.runtime.io.network.TestingConnectionManager;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import 
org.apache.flink.runtime.io.network.metrics.CreditBasedInputBuffersUsageGauge;
+import org.apache.flink.runtime.io.network.metrics.ExclusiveBuffersUsageGauge;
+import org.apache.flink.runtime.io.network.metrics.FloatingBuffersUsageGauge;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests the metrics for input buffers usage.
+ */
+public class InputBuffersMetricsTest extends TestLogger {
+
+   @Test
+   public void testCalculateTotalBuffersSize() throws IOException {
+   int numberOfRemoteChannels = 2;
+   int numberOfLocalChannels = 0;
+
+   int numberOfBufferPerChannel = 2;
+   int numberOfBuffersPerGate = 8;
+
+   NettyShuffleEnvironment network = new 
NettyShuffleEnvironmentBuilder()
+   .setNetworkBuffersPerChannel(numberOfBufferPerChannel)
+   
.setFloatingNetworkBuffersPerGate(numberOfBuffersPerGate)
+   .build();
+
+   Tuple3, 
List> tuple1 = buildInputGate(
+   network,
+   numberOfRemoteChannels,
+   numberOfLocalChannels);
+
+   SingleInputGate inputGate1 = tuple1.f0;
+
+   SingleInputGate[] inputGates = new 
SingleInputGate[]{inputGate1};
+   FloatingBuffersUsageGauge floatingBuffersUsageGauge = new 
FloatingBuffersUsageGauge(inputGates);
+   ExclusiveBuffersUsageGauge exclusiveBuffersUsageGauge = new 
ExclusiveBuffersUsageGauge(inputGates);
+   CreditBasedInputBuffersUsageGauge inputBufferPoolUsageGauge = 
new CreditBasedInputBuffersUsageGauge(
+   floatingBuffersUsageGauge,
+   exclusiveBuffersUsageGauge,
+   inputGates);
+
+   try (CloseableRegistry closeableRegistry = new 
CloseableRegistry()) {
+
+   closeableRegistry.registerCloseable(network::close);
+   closeableRegistry.registerCloseable(inputGate1::close);
+
+   assertEquals(numberOfBuffersPerGate, 
floatingBuffersUsageGauge.calculateTotalBuffers(inputGate1));
+   assertEquals(numberOfRemoteChannels * 
numberOfBufferPerChannel, 
exclusiveBuffersUsageGauge.calculateTotalBuffers(inputGate1));
+   assertEquals(numberOfRemoteChannels * 
numberOfBufferPerChannel + numberOfBuffersPerGate, 
inputBufferPoolUsageGauge.calculateTotalBuffers(inputGate1));
+   }
+   }
+
+   @Test
+   public void testExclusiveBuffersUsage() throws IOException {
+   int numberOfRemoteChannelsGate1 = 2;
+   int numberOfLocalChannelsGate1 = 0;
+   int numberOfRemoteChannelsGate2 = 1;
+   int numberOfLocalChannelsGate2 = 1;
+
+   int numberOfRemoteChannelsTotal = numberOfRemoteChannelsGate1 + 
numberOfRemoteChannelsGate2;
+   int numberOfInputGates = 2;
+
+   int buffersPerChannel = 2;
+   int extraNetworkBuffersPerGate = 8;
+
+   Closea

[GitHub] [flink] zhijiangW commented on a change in pull request #8455: [FLINK-12284][Network, Metrics]Fix the incorrect inputBufferUsage metric in credit-based network mode

2019-07-01 Thread GitBox
zhijiangW commented on a change in pull request #8455: 
[FLINK-12284][Network,Metrics]Fix the incorrect inputBufferUsage metric in 
credit-based network mode
URL: https://github.com/apache/flink/pull/8455#discussion_r299290244
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputBuffersMetricsTest.java
 ##
 @@ -0,0 +1,316 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.consumer;
+
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.runtime.io.network.NettyShuffleEnvironment;
+import org.apache.flink.runtime.io.network.NettyShuffleEnvironmentBuilder;
+import org.apache.flink.runtime.io.network.TestingConnectionManager;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import 
org.apache.flink.runtime.io.network.metrics.CreditBasedInputBuffersUsageGauge;
+import org.apache.flink.runtime.io.network.metrics.ExclusiveBuffersUsageGauge;
+import org.apache.flink.runtime.io.network.metrics.FloatingBuffersUsageGauge;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests the metrics for input buffers usage.
+ */
+public class InputBuffersMetricsTest extends TestLogger {
+
+   @Test
+   public void testCalculateTotalBuffersSize() throws IOException {
+   int numberOfRemoteChannels = 2;
+   int numberOfLocalChannels = 0;
+
+   int numberOfBufferPerChannel = 2;
+   int numberOfBuffersPerGate = 8;
+
+   NettyShuffleEnvironment network = new 
NettyShuffleEnvironmentBuilder()
+   .setNetworkBuffersPerChannel(numberOfBufferPerChannel)
+   
.setFloatingNetworkBuffersPerGate(numberOfBuffersPerGate)
+   .build();
+
+   Tuple3, 
List> tuple1 = buildInputGate(
+   network,
+   numberOfRemoteChannels,
+   numberOfLocalChannels);
+
+   SingleInputGate inputGate1 = tuple1.f0;
+
+   SingleInputGate[] inputGates = new 
SingleInputGate[]{inputGate1};
+   FloatingBuffersUsageGauge floatingBuffersUsageGauge = new 
FloatingBuffersUsageGauge(inputGates);
+   ExclusiveBuffersUsageGauge exclusiveBuffersUsageGauge = new 
ExclusiveBuffersUsageGauge(inputGates);
+   CreditBasedInputBuffersUsageGauge inputBufferPoolUsageGauge = 
new CreditBasedInputBuffersUsageGauge(
+   floatingBuffersUsageGauge,
+   exclusiveBuffersUsageGauge,
+   inputGates);
+
+   try (CloseableRegistry closeableRegistry = new 
CloseableRegistry()) {
+
+   closeableRegistry.registerCloseable(network::close);
+   closeableRegistry.registerCloseable(inputGate1::close);
+
+   assertEquals(numberOfBuffersPerGate, 
floatingBuffersUsageGauge.calculateTotalBuffers(inputGate1));
+   assertEquals(numberOfRemoteChannels * 
numberOfBufferPerChannel, 
exclusiveBuffersUsageGauge.calculateTotalBuffers(inputGate1));
+   assertEquals(numberOfRemoteChannels * 
numberOfBufferPerChannel + numberOfBuffersPerGate, 
inputBufferPoolUsageGauge.calculateTotalBuffers(inputGate1));
+   }
+   }
+
+   @Test
+   public void testExclusiveBuffersUsage() throws IOException {
+   int numberOfRemoteChannelsGate1 = 2;
+   int numberOfLocalChannelsGate1 = 0;
+   int numberOfRemoteChannelsGate2 = 1;
+   int numberOfLocalChannelsGate2 = 1;
+
+   int numberOfRemoteChannelsTotal = numberOfRemoteChannelsGate1 + 
numberOfRemoteChannelsGate2;
+   int numberOfInputGates = 2;
+
+   int buffersPerChannel = 2;
+   int extraNetworkBuffersPerGate = 8;
+
+   Closea

[GitHub] [flink] zhijiangW commented on a change in pull request #8455: [FLINK-12284][Network, Metrics]Fix the incorrect inputBufferUsage metric in credit-based network mode

2019-07-01 Thread GitBox
zhijiangW commented on a change in pull request #8455: 
[FLINK-12284][Network,Metrics]Fix the incorrect inputBufferUsage metric in 
credit-based network mode
URL: https://github.com/apache/flink/pull/8455#discussion_r299290244
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputBuffersMetricsTest.java
 ##
 @@ -0,0 +1,316 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.consumer;
+
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.runtime.io.network.NettyShuffleEnvironment;
+import org.apache.flink.runtime.io.network.NettyShuffleEnvironmentBuilder;
+import org.apache.flink.runtime.io.network.TestingConnectionManager;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import 
org.apache.flink.runtime.io.network.metrics.CreditBasedInputBuffersUsageGauge;
+import org.apache.flink.runtime.io.network.metrics.ExclusiveBuffersUsageGauge;
+import org.apache.flink.runtime.io.network.metrics.FloatingBuffersUsageGauge;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests the metrics for input buffers usage.
+ */
+public class InputBuffersMetricsTest extends TestLogger {
+
+   @Test
+   public void testCalculateTotalBuffersSize() throws IOException {
+   int numberOfRemoteChannels = 2;
+   int numberOfLocalChannels = 0;
+
+   int numberOfBufferPerChannel = 2;
+   int numberOfBuffersPerGate = 8;
+
+   NettyShuffleEnvironment network = new 
NettyShuffleEnvironmentBuilder()
+   .setNetworkBuffersPerChannel(numberOfBufferPerChannel)
+   
.setFloatingNetworkBuffersPerGate(numberOfBuffersPerGate)
+   .build();
+
+   Tuple3, 
List> tuple1 = buildInputGate(
+   network,
+   numberOfRemoteChannels,
+   numberOfLocalChannels);
+
+   SingleInputGate inputGate1 = tuple1.f0;
+
+   SingleInputGate[] inputGates = new 
SingleInputGate[]{inputGate1};
+   FloatingBuffersUsageGauge floatingBuffersUsageGauge = new 
FloatingBuffersUsageGauge(inputGates);
+   ExclusiveBuffersUsageGauge exclusiveBuffersUsageGauge = new 
ExclusiveBuffersUsageGauge(inputGates);
+   CreditBasedInputBuffersUsageGauge inputBufferPoolUsageGauge = 
new CreditBasedInputBuffersUsageGauge(
+   floatingBuffersUsageGauge,
+   exclusiveBuffersUsageGauge,
+   inputGates);
+
+   try (CloseableRegistry closeableRegistry = new 
CloseableRegistry()) {
+
+   closeableRegistry.registerCloseable(network::close);
+   closeableRegistry.registerCloseable(inputGate1::close);
+
+   assertEquals(numberOfBuffersPerGate, 
floatingBuffersUsageGauge.calculateTotalBuffers(inputGate1));
+   assertEquals(numberOfRemoteChannels * 
numberOfBufferPerChannel, 
exclusiveBuffersUsageGauge.calculateTotalBuffers(inputGate1));
+   assertEquals(numberOfRemoteChannels * 
numberOfBufferPerChannel + numberOfBuffersPerGate, 
inputBufferPoolUsageGauge.calculateTotalBuffers(inputGate1));
+   }
+   }
+
+   @Test
+   public void testExclusiveBuffersUsage() throws IOException {
+   int numberOfRemoteChannelsGate1 = 2;
+   int numberOfLocalChannelsGate1 = 0;
+   int numberOfRemoteChannelsGate2 = 1;
+   int numberOfLocalChannelsGate2 = 1;
+
+   int numberOfRemoteChannelsTotal = numberOfRemoteChannelsGate1 + 
numberOfRemoteChannelsGate2;
+   int numberOfInputGates = 2;
+
+   int buffersPerChannel = 2;
+   int extraNetworkBuffersPerGate = 8;
+
+   Closea

[GitHub] [flink] zhijiangW commented on a change in pull request #8455: [FLINK-12284][Network, Metrics]Fix the incorrect inputBufferUsage metric in credit-based network mode

2019-07-01 Thread GitBox
zhijiangW commented on a change in pull request #8455: 
[FLINK-12284][Network,Metrics]Fix the incorrect inputBufferUsage metric in 
credit-based network mode
URL: https://github.com/apache/flink/pull/8455#discussion_r299289710
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputBuffersMetricsTest.java
 ##
 @@ -0,0 +1,316 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.consumer;
+
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.runtime.io.network.NettyShuffleEnvironment;
+import org.apache.flink.runtime.io.network.NettyShuffleEnvironmentBuilder;
+import org.apache.flink.runtime.io.network.TestingConnectionManager;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import 
org.apache.flink.runtime.io.network.metrics.CreditBasedInputBuffersUsageGauge;
+import org.apache.flink.runtime.io.network.metrics.ExclusiveBuffersUsageGauge;
+import org.apache.flink.runtime.io.network.metrics.FloatingBuffersUsageGauge;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests the metrics for input buffers usage.
+ */
+public class InputBuffersMetricsTest extends TestLogger {
+
+   @Test
+   public void testCalculateTotalBuffersSize() throws IOException {
+   int numberOfRemoteChannels = 2;
+   int numberOfLocalChannels = 0;
+
+   int numberOfBufferPerChannel = 2;
+   int numberOfBuffersPerGate = 8;
+
+   NettyShuffleEnvironment network = new 
NettyShuffleEnvironmentBuilder()
+   .setNetworkBuffersPerChannel(numberOfBufferPerChannel)
+   
.setFloatingNetworkBuffersPerGate(numberOfBuffersPerGate)
+   .build();
+
+   Tuple3, 
List> tuple1 = buildInputGate(
+   network,
+   numberOfRemoteChannels,
+   numberOfLocalChannels);
+
+   SingleInputGate inputGate1 = tuple1.f0;
+
+   SingleInputGate[] inputGates = new 
SingleInputGate[]{inputGate1};
+   FloatingBuffersUsageGauge floatingBuffersUsageGauge = new 
FloatingBuffersUsageGauge(inputGates);
+   ExclusiveBuffersUsageGauge exclusiveBuffersUsageGauge = new 
ExclusiveBuffersUsageGauge(inputGates);
+   CreditBasedInputBuffersUsageGauge inputBufferPoolUsageGauge = 
new CreditBasedInputBuffersUsageGauge(
+   floatingBuffersUsageGauge,
+   exclusiveBuffersUsageGauge,
+   inputGates);
+
+   try (CloseableRegistry closeableRegistry = new 
CloseableRegistry()) {
+
+   closeableRegistry.registerCloseable(network::close);
+   closeableRegistry.registerCloseable(inputGate1::close);
+
+   assertEquals(numberOfBuffersPerGate, 
floatingBuffersUsageGauge.calculateTotalBuffers(inputGate1));
+   assertEquals(numberOfRemoteChannels * 
numberOfBufferPerChannel, 
exclusiveBuffersUsageGauge.calculateTotalBuffers(inputGate1));
+   assertEquals(numberOfRemoteChannels * 
numberOfBufferPerChannel + numberOfBuffersPerGate, 
inputBufferPoolUsageGauge.calculateTotalBuffers(inputGate1));
+   }
+   }
+
+   @Test
+   public void testExclusiveBuffersUsage() throws IOException {
+   int numberOfRemoteChannelsGate1 = 2;
+   int numberOfLocalChannelsGate1 = 0;
+   int numberOfRemoteChannelsGate2 = 1;
+   int numberOfLocalChannelsGate2 = 1;
+
+   int numberOfRemoteChannelsTotal = numberOfRemoteChannelsGate1 + 
numberOfRemoteChannelsGate2;
+   int numberOfInputGates = 2;
+
+   int buffersPerChannel = 2;
+   int extraNetworkBuffersPerGate = 8;
+
+   Closea

[GitHub] [flink] zhijiangW commented on a change in pull request #8455: [FLINK-12284][Network, Metrics]Fix the incorrect inputBufferUsage metric in credit-based network mode

2019-07-01 Thread GitBox
zhijiangW commented on a change in pull request #8455: 
[FLINK-12284][Network,Metrics]Fix the incorrect inputBufferUsage metric in 
credit-based network mode
URL: https://github.com/apache/flink/pull/8455#discussion_r299289630
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputBuffersMetricsTest.java
 ##
 @@ -0,0 +1,316 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.consumer;
+
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.runtime.io.network.NettyShuffleEnvironment;
+import org.apache.flink.runtime.io.network.NettyShuffleEnvironmentBuilder;
+import org.apache.flink.runtime.io.network.TestingConnectionManager;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import 
org.apache.flink.runtime.io.network.metrics.CreditBasedInputBuffersUsageGauge;
+import org.apache.flink.runtime.io.network.metrics.ExclusiveBuffersUsageGauge;
+import org.apache.flink.runtime.io.network.metrics.FloatingBuffersUsageGauge;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests the metrics for input buffers usage.
+ */
+public class InputBuffersMetricsTest extends TestLogger {
+
+   @Test
+   public void testCalculateTotalBuffersSize() throws IOException {
+   int numberOfRemoteChannels = 2;
+   int numberOfLocalChannels = 0;
+
+   int numberOfBufferPerChannel = 2;
+   int numberOfBuffersPerGate = 8;
+
+   NettyShuffleEnvironment network = new 
NettyShuffleEnvironmentBuilder()
+   .setNetworkBuffersPerChannel(numberOfBufferPerChannel)
+   
.setFloatingNetworkBuffersPerGate(numberOfBuffersPerGate)
+   .build();
+
+   Tuple3, 
List> tuple1 = buildInputGate(
+   network,
+   numberOfRemoteChannels,
+   numberOfLocalChannels);
+
+   SingleInputGate inputGate1 = tuple1.f0;
+
+   SingleInputGate[] inputGates = new 
SingleInputGate[]{inputGate1};
+   FloatingBuffersUsageGauge floatingBuffersUsageGauge = new 
FloatingBuffersUsageGauge(inputGates);
+   ExclusiveBuffersUsageGauge exclusiveBuffersUsageGauge = new 
ExclusiveBuffersUsageGauge(inputGates);
+   CreditBasedInputBuffersUsageGauge inputBufferPoolUsageGauge = 
new CreditBasedInputBuffersUsageGauge(
+   floatingBuffersUsageGauge,
+   exclusiveBuffersUsageGauge,
+   inputGates);
+
+   try (CloseableRegistry closeableRegistry = new 
CloseableRegistry()) {
+
+   closeableRegistry.registerCloseable(network::close);
+   closeableRegistry.registerCloseable(inputGate1::close);
+
+   assertEquals(numberOfBuffersPerGate, 
floatingBuffersUsageGauge.calculateTotalBuffers(inputGate1));
+   assertEquals(numberOfRemoteChannels * 
numberOfBufferPerChannel, 
exclusiveBuffersUsageGauge.calculateTotalBuffers(inputGate1));
+   assertEquals(numberOfRemoteChannels * 
numberOfBufferPerChannel + numberOfBuffersPerGate, 
inputBufferPoolUsageGauge.calculateTotalBuffers(inputGate1));
+   }
+   }
+
+   @Test
+   public void testExclusiveBuffersUsage() throws IOException {
+   int numberOfRemoteChannelsGate1 = 2;
+   int numberOfLocalChannelsGate1 = 0;
+   int numberOfRemoteChannelsGate2 = 1;
+   int numberOfLocalChannelsGate2 = 1;
+
+   int numberOfRemoteChannelsTotal = numberOfRemoteChannelsGate1 + 
numberOfRemoteChannelsGate2;
+   int numberOfInputGates = 2;
+
+   int buffersPerChannel = 2;
+   int extraNetworkBuffersPerGate = 8;
+
+   Closea

[jira] [Commented] (FLINK-13011) Release the PyFlink into PyPI

2019-07-01 Thread sunjincheng (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-13011?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16876629#comment-16876629
 ] 

sunjincheng commented on FLINK-13011:
-

Thanks [~dian.fu] :) That's great!

> Release the PyFlink into PyPI
> -
>
> Key: FLINK-13011
> URL: https://issues.apache.org/jira/browse/FLINK-13011
> Project: Flink
>  Issue Type: Sub-task
>  Components: API / Python, Build System
>Affects Versions: 1.9.0
>Reporter: sunjincheng
>Priority: Major
>
> FLINK-12962 adds the ability to build a PyFlink distribution package, but we 
> have not yet released PyFlink to PyPI. The goal of JIRA is to publish the 
> PyFlinjk distribution package built by FLINK-12962 to PyPI. 
> https://pypi.org/
> https://packaging.python.org/tutorials/packaging-projects/
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] zhijiangW commented on a change in pull request #8455: [FLINK-12284][Network, Metrics]Fix the incorrect inputBufferUsage metric in credit-based network mode

2019-07-01 Thread GitBox
zhijiangW commented on a change in pull request #8455: 
[FLINK-12284][Network,Metrics]Fix the incorrect inputBufferUsage metric in 
credit-based network mode
URL: https://github.com/apache/flink/pull/8455#discussion_r299288165
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputBuffersMetricsTest.java
 ##
 @@ -0,0 +1,316 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.consumer;
+
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.runtime.io.network.NettyShuffleEnvironment;
+import org.apache.flink.runtime.io.network.NettyShuffleEnvironmentBuilder;
+import org.apache.flink.runtime.io.network.TestingConnectionManager;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import 
org.apache.flink.runtime.io.network.metrics.CreditBasedInputBuffersUsageGauge;
+import org.apache.flink.runtime.io.network.metrics.ExclusiveBuffersUsageGauge;
+import org.apache.flink.runtime.io.network.metrics.FloatingBuffersUsageGauge;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests the metrics for input buffers usage.
+ */
+public class InputBuffersMetricsTest extends TestLogger {
+
+   @Test
+   public void testCalculateTotalBuffersSize() throws IOException {
+   int numberOfRemoteChannels = 2;
+   int numberOfLocalChannels = 0;
+
+   int numberOfBufferPerChannel = 2;
+   int numberOfBuffersPerGate = 8;
+
+   NettyShuffleEnvironment network = new 
NettyShuffleEnvironmentBuilder()
+   .setNetworkBuffersPerChannel(numberOfBufferPerChannel)
+   
.setFloatingNetworkBuffersPerGate(numberOfBuffersPerGate)
+   .build();
+
+   Tuple3, 
List> tuple1 = buildInputGate(
+   network,
+   numberOfRemoteChannels,
+   numberOfLocalChannels);
+
+   SingleInputGate inputGate1 = tuple1.f0;
 
 Review comment:
   We only need `SingleInputGate` from tuple here, so it is simple 
`SinglerInputGate inputGate = buildInputGate(network, numberOfRemoteChannels, 
numberOfLocalChannels).f0` directly.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #8455: [FLINK-12284][Network, Metrics]Fix the incorrect inputBufferUsage metric in credit-based network mode

2019-07-01 Thread GitBox
zhijiangW commented on a change in pull request #8455: 
[FLINK-12284][Network,Metrics]Fix the incorrect inputBufferUsage metric in 
credit-based network mode
URL: https://github.com/apache/flink/pull/8455#discussion_r299287676
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputBuffersMetricsTest.java
 ##
 @@ -0,0 +1,301 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.consumer;
+
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.runtime.io.network.NettyShuffleEnvironment;
+import org.apache.flink.runtime.io.network.NettyShuffleEnvironmentBuilder;
+import org.apache.flink.runtime.io.network.TestingConnectionManager;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import 
org.apache.flink.runtime.io.network.metrics.CreditBasedInputBuffersUsageGauge;
+import org.apache.flink.runtime.io.network.metrics.ExclusiveBuffersUsageGauge;
+import org.apache.flink.runtime.io.network.metrics.FloatingBuffersUsageGauge;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests the metrics for input buffers usage.
+ */
+public class InputBuffersMetricsTest extends TestLogger {
+
+   @Test
+   public void testCalculateTotalBuffersSize() throws IOException {
+   int numberOfRemoteChannels = 2;
+   int numberOfLocalChannels = 0;
+
+   int buffersPerChannel = 2;
+   int extraNetworkBuffersPerGate = 8;
+
+   CloseableRegistry closeableRegistry = new CloseableRegistry();
+
+   NettyShuffleEnvironment network = new 
NettyShuffleEnvironmentBuilder()
+   .setNetworkBuffersPerChannel(buffersPerChannel)
+   
.setFloatingNetworkBuffersPerGate(extraNetworkBuffersPerGate)
+   .build();
+
+   Tuple3, 
List> tuple1 = buildInputGate(
+   network,
+   numberOfRemoteChannels,
+   numberOfLocalChannels);
+
+   SingleInputGate inputGate1 = tuple1.f0;
+
+   closeableRegistry.registerCloseable(network::close);
+   closeableRegistry.registerCloseable(inputGate1::close);
+
+   try {
+   SingleInputGate[] inputGates = new 
SingleInputGate[]{inputGate1};
+   FloatingBuffersUsageGauge floatingBuffersUsageGauge = 
new FloatingBuffersUsageGauge(inputGates);
+   ExclusiveBuffersUsageGauge exclusiveBuffersUsageGauge = 
new ExclusiveBuffersUsageGauge(inputGates);
+   CreditBasedInputBuffersUsageGauge 
inputBufferPoolUsageGauge = new CreditBasedInputBuffersUsageGauge(
+   floatingBuffersUsageGauge,
+   exclusiveBuffersUsageGauge,
+   inputGates);
+
+   assertEquals(extraNetworkBuffersPerGate, 
floatingBuffersUsageGauge.calculateTotalBuffers(inputGate1));
+   assertEquals(numberOfRemoteChannels * 
buffersPerChannel, 
exclusiveBuffersUsageGauge.calculateTotalBuffers(inputGate1));
+   assertEquals(numberOfRemoteChannels * buffersPerChannel 
+ extraNetworkBuffersPerGate, 
inputBufferPoolUsageGauge.calculateTotalBuffers(inputGate1));
+   } finally {
+   closeableRegistry.close();
+   }
+   }
+
+   @Test
+   public void testExclusiveBuffersUsage() throws IOException {
+   int numberOfRemoteChannelsGate1 = 2;
+   int numberOfLocalChannelsGate1 = 0;
+   int numberOfRemoteChannelsGate2 = 1;
+   int numberOfLocalChannelsGate2 = 1;
+
+   int numberOfRemoteChannelsTotal = numberOfRemoteChannelsGate1 + 

[GitHub] [flink] zhijiangW commented on a change in pull request #8455: [FLINK-12284][Network, Metrics]Fix the incorrect inputBufferUsage metric in credit-based network mode

2019-07-01 Thread GitBox
zhijiangW commented on a change in pull request #8455: 
[FLINK-12284][Network,Metrics]Fix the incorrect inputBufferUsage metric in 
credit-based network mode
URL: https://github.com/apache/flink/pull/8455#discussion_r299287578
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputBuffersMetricsTest.java
 ##
 @@ -0,0 +1,316 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.consumer;
+
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.runtime.io.network.NettyShuffleEnvironment;
+import org.apache.flink.runtime.io.network.NettyShuffleEnvironmentBuilder;
+import org.apache.flink.runtime.io.network.TestingConnectionManager;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import 
org.apache.flink.runtime.io.network.metrics.CreditBasedInputBuffersUsageGauge;
+import org.apache.flink.runtime.io.network.metrics.ExclusiveBuffersUsageGauge;
+import org.apache.flink.runtime.io.network.metrics.FloatingBuffersUsageGauge;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests the metrics for input buffers usage.
+ */
+public class InputBuffersMetricsTest extends TestLogger {
+
+   @Test
+   public void testCalculateTotalBuffersSize() throws IOException {
+   int numberOfRemoteChannels = 2;
+   int numberOfLocalChannels = 0;
+
+   int numberOfBufferPerChannel = 2;
+   int numberOfBuffersPerGate = 8;
+
+   NettyShuffleEnvironment network = new 
NettyShuffleEnvironmentBuilder()
+   .setNetworkBuffersPerChannel(numberOfBufferPerChannel)
+   
.setFloatingNetworkBuffersPerGate(numberOfBuffersPerGate)
+   .build();
+
+   Tuple3, 
List> tuple1 = buildInputGate(
+   network,
+   numberOfRemoteChannels,
+   numberOfLocalChannels);
+
+   SingleInputGate inputGate1 = tuple1.f0;
+
+   SingleInputGate[] inputGates = new 
SingleInputGate[]{inputGate1};
+   FloatingBuffersUsageGauge floatingBuffersUsageGauge = new 
FloatingBuffersUsageGauge(inputGates);
+   ExclusiveBuffersUsageGauge exclusiveBuffersUsageGauge = new 
ExclusiveBuffersUsageGauge(inputGates);
+   CreditBasedInputBuffersUsageGauge inputBufferPoolUsageGauge = 
new CreditBasedInputBuffersUsageGauge(
+   floatingBuffersUsageGauge,
+   exclusiveBuffersUsageGauge,
+   inputGates);
+
+   try (CloseableRegistry closeableRegistry = new 
CloseableRegistry()) {
+
+   closeableRegistry.registerCloseable(network::close);
+   closeableRegistry.registerCloseable(inputGate1::close);
+
+   assertEquals(numberOfBuffersPerGate, 
floatingBuffersUsageGauge.calculateTotalBuffers(inputGate1));
+   assertEquals(numberOfRemoteChannels * 
numberOfBufferPerChannel, 
exclusiveBuffersUsageGauge.calculateTotalBuffers(inputGate1));
+   assertEquals(numberOfRemoteChannels * 
numberOfBufferPerChannel + numberOfBuffersPerGate, 
inputBufferPoolUsageGauge.calculateTotalBuffers(inputGate1));
+   }
+   }
+
+   @Test
+   public void testExclusiveBuffersUsage() throws IOException {
+   int numberOfRemoteChannelsGate1 = 2;
+   int numberOfLocalChannelsGate1 = 0;
+   int numberOfRemoteChannelsGate2 = 1;
+   int numberOfLocalChannelsGate2 = 1;
+
+   int numberOfRemoteChannelsTotal = numberOfRemoteChannelsGate1 + 
numberOfRemoteChannelsGate2;
+   int numberOfInputGates = 2;
+
+   int buffersPerChannel = 2;
+   int extraNetworkBuffersPerGate = 8;
+
+   Closea

[GitHub] [flink] c4emmmm commented on issue #8526: [FLINK-12597][ml] Remove the legacy flink-libraries/flink-ml

2019-07-01 Thread GitBox
c4e commented on issue #8526: [FLINK-12597][ml] Remove the legacy 
flink-libraries/flink-ml
URL: https://github.com/apache/flink/pull/8526#issuecomment-507495317
 
 
   > @c4e Agreed, fell free to pull in my commit from #8827 or I can reopen 
the PR. As part of FLIP-42 we are restructuring the docs and decided to fully 
remove the old FlinkML documentation. If users are still using this library 
from an old version, they can also use an old version of the docs. I would be 
opposed to adding an "under construction page", I find that more confusing.
   
   Thanks. I also oppose that with deeper thought so I mark it with delete line.
   
   Since the modification is almost done in #8827, I think it's better to 
reopen the PR and merge it. 
   The only docs that mentioning FlinkML are docs/internals/components.md and 
components.zh.md. You can find the keyword "FlinkML" and decide whether to 
remove them.
   
   @sjwiesman , would you please spend a little time doing this if 
@shaoxuan-wang and @zentol agree we merge this and #8827 separately? And what 
do you think? @shaoxuan-wang @zentol 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-13050) Counting more checkpoint failure reason in CheckpointFailureManager

2019-07-01 Thread vinoyang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-13050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16876617#comment-16876617
 ] 

vinoyang commented on FLINK-13050:
--

cc [~pnowojski] [~azagrebin] WDYT?

> Counting more checkpoint failure reason in CheckpointFailureManager
> ---
>
> Key: FLINK-13050
> URL: https://issues.apache.org/jira/browse/FLINK-13050
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Checkpointing
>Reporter: vinoyang
>Assignee: vinoyang
>Priority: Major
>
> Currently, {{CheckpointFailureManager}} only counted little failure reasons 
> to keep compatible with {{setFailOnCheckpointingErrors}}. While 
> {{setFailOnCheckpointingErrors}} has been deprecated in FLINK-11662. IMO, we 
> can count more checkpoint failure reasons.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-13050) Counting more checkpoint failure reason in CheckpointFailureManager

2019-07-01 Thread vinoyang (JIRA)
vinoyang created FLINK-13050:


 Summary: Counting more checkpoint failure reason in 
CheckpointFailureManager
 Key: FLINK-13050
 URL: https://issues.apache.org/jira/browse/FLINK-13050
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Checkpointing
Reporter: vinoyang
Assignee: vinoyang


Currently, {{CheckpointFailureManager}} only counted little failure reasons to 
keep compatible with {{setFailOnCheckpointingErrors}}. While 
{{setFailOnCheckpointingErrors}} has been deprecated in FLINK-11662. IMO, we 
can count more checkpoint failure reasons.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] liyafan82 closed pull request #8934: [FLINK-12628][Runtime / Coordination] Check test failure if partitionhas no consumers in Execution.getPartitionMaxParallelism

2019-07-01 Thread GitBox
liyafan82 closed pull request #8934: [FLINK-12628][Runtime / Coordination] 
Check test failure if partitionhas no consumers in 
Execution.getPartitionMaxParallelism
URL: https://github.com/apache/flink/pull/8934
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] liyafan82 opened a new pull request #8934: [FLINK-12628][Runtime / Coordination] Check test failure if partitionhas no consumers in Execution.getPartitionMaxParallelism

2019-07-01 Thread GitBox
liyafan82 opened a new pull request #8934: [FLINK-12628][Runtime / 
Coordination] Check test failure if partitionhas no consumers in 
Execution.getPartitionMaxParallelism
URL: https://github.com/apache/flink/pull/8934
 
 
   
   
   ## What is the purpose of the change
   
   Resolve issue FLINK-12628 Check test failure if partition has no consumers 
in Execution.getPartitionMaxParallelism:
   
   Currently, we work around this case in Execution.getPartitionMaxParallelism 
because of tests:
   
   // TODO consumers.isEmpty() only exists for test, currently there has to be 
exactly one consumer in real jobs!
   
   though partition is supposed to have always at least one consumer atm.
   We should check which test fails and consider fixing it.
   
   According to my investigation, there is no test failure, when we ignore the 
case for consumers.isEmpty() equals to true.
   
   ## Brief change log
   
 - Change the implementation of Execution.getPartitionMaxParallelism to 
ignore the case for consumers.isEmpty() equals to true.
   
   
   ## Verifying this change
   
   This change is already covered by existing tests, such as ExecutionTest.
   
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (no)
 - If yes, how is the feature documented? (not applicable)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] liyafan82 commented on a change in pull request #8934: [FLINK-12628][Runtime / Coordination] Check test failure if partitionhas no consumers in Execution.getPartitionMaxParallelism

2019-07-01 Thread GitBox
liyafan82 commented on a change in pull request #8934: [FLINK-12628][Runtime / 
Coordination] Check test failure if partitionhas no consumers in 
Execution.getPartitionMaxParallelism
URL: https://github.com/apache/flink/pull/8934#discussion_r299282016
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
 ##
 @@ -679,14 +678,10 @@ public void setInitialState(@Nullable 
JobManagerTaskRestore taskRestore) {
}
 
private static int 
getPartitionMaxParallelism(IntermediateResultPartition partition) {
-   // TODO consumers.isEmpty() only exists for test, currently 
there has to be exactly one consumer in real jobs!
final List> consumers = 
partition.getConsumers();
-   int maxParallelism = 
KeyGroupRangeAssignment.UPPER_BOUND_MAX_PARALLELISM;
-   if (!consumers.isEmpty()) {
-   List consumer = consumers.get(0);
-   ExecutionJobVertex consumerVertex = 
consumer.get(0).getTarget().getJobVertex();
-   maxParallelism = consumerVertex.getMaxParallelism();
-   }
+   List consumer = consumers.get(0);
 
 Review comment:
   Maybe not. In my local environment, no exception was thrown when we ignore 
the case for the consumer being empty. Let's see if we can reproduce it in 
Travis.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] liyafan82 commented on a change in pull request #8936: [FLINK-13043][Library / CEP] Fix the bug of parsing Dewey number from string

2019-07-01 Thread GitBox
liyafan82 commented on a change in pull request #8936: [FLINK-13043][Library / 
CEP] Fix the bug of parsing Dewey number from string
URL: https://github.com/apache/flink/pull/8936#discussion_r299281233
 
 

 ##
 File path: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/DeweyNumber.java
 ##
 @@ -175,16 +175,18 @@ public String toString() {
public static DeweyNumber fromString(final String deweyNumberString) {
String[] splits = deweyNumberString.split("\\.");
 
-   if (splits.length == 0) {
+   if (splits.length == 1) {
return new 
DeweyNumber(Integer.parseInt(deweyNumberString));
-   } else {
+   } else if (splits.length > 0) {
 
 Review comment:
   @Myasuka thanks for the comments. 
   In some rare cases, the length can be 0. So this condition guards against 
such cases. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] bowenli86 commented on a change in pull request #8940: [FLINK-13047][table] Fix the Optional.orElse() usage issue in Databas…

2019-07-01 Thread GitBox
bowenli86 commented on a change in pull request #8940: [FLINK-13047][table] Fix 
the Optional.orElse() usage issue in Databas…
URL: https://github.com/apache/flink/pull/8940#discussion_r299279990
 
 

 ##
 File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/catalog/DatabaseCalciteSchema.java
 ##
 @@ -104,9 +103,19 @@ private Table 
convertConnectorTable(ConnectorCatalogTable table) {
}
 
private Table convertCatalogTable(ObjectPath tablePath, CatalogTable 
table) {
+   TableSource tableSource;
Optional tableFactory = catalog.getTableFactory();
-   TableSource tableSource = tableFactory.map(tf -> 
((TableSourceFactory) tf).createTableSource(tablePath, table))
-   
.orElse(TableFactoryUtil.findAndCreateTableSource(table));
+   if (tableFactory.isPresent()) {
+   TableFactory tf = tableFactory.get();
+   if (tf instanceof TableSourceFactory) {
+   tableSource = ((TableSourceFactory) 
tf).createTableSource(tablePath, table);
+   } else {
+   throw new 
TableException(String.format("TableFactory provided by catalog %s must 
implement TableSourceFactory",
 
 Review comment:
   minor: shall we use similar error message as that in 
`convertConnectorTable()`, something like "Cannot query a sink-only table. 
TableFactory provided by catalog %s must implement ".
   
   Asking this because when I first saw this exception, I got a bit on confused 
why it cannot be `TableSinkFactory`. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-13037) Translate "Concepts -> Glossary" page into Chinese

2019-07-01 Thread Jark Wu (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-13037?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16876607#comment-16876607
 ] 

Jark Wu commented on FLINK-13037:
-

Hi [~knaufk], could you link the prerequisite issue which will create this 
page/markdown?

> Translate "Concepts -> Glossary" page into Chinese
> --
>
> Key: FLINK-13037
> URL: https://issues.apache.org/jira/browse/FLINK-13037
> Project: Flink
>  Issue Type: Sub-task
>  Components: chinese-translation, Documentation
>Reporter: Konstantin Knauf
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] ifndef-SleePy closed pull request #8894: [FLINK-12961][datastream] Providing an internal execution method of StreamExecutionEnvironment accepting StreamGraph as input parameter

2019-07-01 Thread GitBox
ifndef-SleePy closed pull request #8894:  [FLINK-12961][datastream] Providing 
an internal execution method of StreamExecutionEnvironment accepting 
StreamGraph as input parameter
URL: https://github.com/apache/flink/pull/8894
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] ifndef-SleePy commented on issue #8894: [FLINK-12961][datastream] Providing an internal execution method of StreamExecutionEnvironment accepting StreamGraph as input parameter

2019-07-01 Thread GitBox
ifndef-SleePy commented on issue #8894:  [FLINK-12961][datastream] Providing an 
internal execution method of StreamExecutionEnvironment accepting StreamGraph 
as input parameter
URL: https://github.com/apache/flink/pull/8894#issuecomment-507490234
 
 
   Abandon this PR since we think alternative approach is better.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-13037) Translate "Concepts -> Glossary" page into Chinese

2019-07-01 Thread Jark Wu (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-13037?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jark Wu updated FLINK-13037:

Component/s: Documentation
 chinese-translation

> Translate "Concepts -> Glossary" page into Chinese
> --
>
> Key: FLINK-13037
> URL: https://issues.apache.org/jira/browse/FLINK-13037
> Project: Flink
>  Issue Type: Sub-task
>  Components: chinese-translation, Documentation
>Reporter: Konstantin Knauf
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-13049) Port planner expressions to blink-planner from flink-planner

2019-07-01 Thread Jingsong Lee (JIRA)
Jingsong Lee created FLINK-13049:


 Summary: Port planner expressions to blink-planner from 
flink-planner
 Key: FLINK-13049
 URL: https://issues.apache.org/jira/browse/FLINK-13049
 Project: Flink
  Issue Type: Improvement
  Components: Table SQL / Planner
Reporter: Jingsong Lee
Assignee: Jingsong Lee






--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] xuefuz commented on issue #8926: [FLINK-13021][table][hive] unify catalog partition implementations

2019-07-01 Thread GitBox
xuefuz commented on issue #8926: [FLINK-13021][table][hive] unify catalog 
partition implementations
URL: https://github.com/apache/flink/pull/8926#issuecomment-507481283
 
 
   Okay. thanks for the explanation. I guess both don't matter much right now. 
Let's review on comments when we do DDL.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] xuefuz commented on a change in pull request #8940: [FLINK-13047][table] Fix the Optional.orElse() usage issue in Databas…

2019-07-01 Thread GitBox
xuefuz commented on a change in pull request #8940: [FLINK-13047][table] Fix 
the Optional.orElse() usage issue in Databas…
URL: https://github.com/apache/flink/pull/8940#discussion_r299262699
 
 

 ##
 File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/catalog/DatabaseCalciteSchema.java
 ##
 @@ -105,8 +105,10 @@ private Table 
convertConnectorTable(ConnectorCatalogTable table) {
 
private Table convertCatalogTable(ObjectPath tablePath, CatalogTable 
table) {
Optional tableFactory = catalog.getTableFactory();
-   TableSource tableSource = tableFactory.map(tf -> 
((TableSourceFactory) tf).createTableSource(tablePath, table))
-   
.orElse(TableFactoryUtil.findAndCreateTableSource(table));
+   TableSource tableSource = tableFactory.map(tf -> 
((TableSourceFactory) tf).createTableSource(tablePath, table)).get();
 
 Review comment:
   Good point. However, that's not what I meant to do. HiveTableFactory is 
responsible to generate table source/sink for both generic and hive tables, so 
it needs to take care of both. In DatabaseCalciteSchema.convertCatalogTable(), 
it calls TableFactoryUtil.findAndCreateTableSource(table) only if the catalog 
doesn't return a table factory object. Thus, there is no duplication per se. 
However, I did realize that the logic here is confusion, especially with the 
java Optional object. I will refactor the code a bit to make it clearer.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] asfgit closed pull request #8669: [FLINK-11147][table][docs] Add documentation for TableAggregate Function

2019-07-01 Thread GitBox
asfgit closed pull request #8669: [FLINK-11147][table][docs] Add documentation 
for TableAggregate Function
URL: https://github.com/apache/flink/pull/8669
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Closed] (FLINK-11147) Add documentation for TableAggregate Function

2019-07-01 Thread sunjincheng (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11147?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

sunjincheng closed FLINK-11147.
---
   Resolution: Fixed
Fix Version/s: 1.9.0

Fixed in master : 6a12908b15c398e37f8603cd84e0d30e14d07784

> Add documentation for TableAggregate Function
> -
>
> Key: FLINK-11147
> URL: https://issues.apache.org/jira/browse/FLINK-11147
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API
>Reporter: Hequn Cheng
>Assignee: Hequn Cheng
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.9.0
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> Add documentation for {{TableAggregateFunction}}, similar to the document of 
> {{AggregateFunction}}: 
> https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/table/udfs.html#aggregation-functions
> Most parts of {{TableAggregateFunction}} would be same with 
> {{AggregateFunction}}, except for the ways of handling outputs. 
> {{AggregateFunction}} outputs a scalar value, while 
> {{TableAggregateFunction}} outputs a Table with multi rows and columns.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] bowenli86 commented on issue #8926: [FLINK-13021][table][hive] unify catalog partition implementations

2019-07-01 Thread GitBox
bowenli86 commented on issue #8926: [FLINK-13021][table][hive] unify catalog 
partition implementations
URL: https://github.com/apache/flink/pull/8926#issuecomment-507466762
 
 
   Good questions.
   
   > 1. I saw a few configuration files for Hive (table/db, etc) and each has 
just few constants. I'm wondering if we should just have one file that covers 
all those configs.
   
   Exactly. I've observed that too. I plan to merge them into a single 
`HiveCatalogConfig` in a following up.
   
   > 2. When we create Catalog objects (table/db/partition), we remove 
"comment" from the property map. I'm wondering how that changes desc command 
output for those objects because ideally comment should be shown, at least for 
extended mode.
   
   We only do that for table and partition.
   
   - Hive's `Database` has a `description` field to store comment and we use 
that, thus that's the same output for 'DESC'
   - Hive DDL for tables (e.g. `CREATE TABLE xx COMMENT xxx`) store comment as 
just a 'comment' property, so that's the same output for 'DESC'
   - AFAIK, Hive DDL doesn't support partition comment, thus it shouldn't 
matter how we store it in properties
   
   
   
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #8669: [FLINK-11147][table][docs] Add documentation for TableAggregate Function

2019-07-01 Thread GitBox
flinkbot edited a comment on issue #8669: [FLINK-11147][table][docs] Add 
documentation for TableAggregate Function
URL: https://github.com/apache/flink/pull/8669#issuecomment-500274671
 
 
   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Review Progress
   
   * ✅ 1. The [description] looks good.
   - Approved by @sunjincheng121 [committer]
   * ✅ 2. There is [consensus] that the contribution should go into to Flink.
   - Approved by @sunjincheng121 [committer]
   * ❓ 3. Needs [attention] from.
   * ✅ 4. The change fits into the overall [architecture].
   - Approved by @sunjincheng121 [committer]
   * ✅ 5. Overall code [quality] is good.
   - Approved by @sunjincheng121 [committer]
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of 
the review process.
The Bot is tracking the review progress through labels. Labels are applied 
according to the order of the review items. For consensus, approval by a Flink 
committer of PMC member is required Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve one or more aspects (aspects: 
`description`, `consensus`, `architecture` and `quality`)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot approve-until architecture` to approve everything until 
`architecture`
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval you gave earlier
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] bowenli86 commented on a change in pull request #8940: [FLINK-13047][table] Fix the Optional.orElse() usage issue in Databas…

2019-07-01 Thread GitBox
bowenli86 commented on a change in pull request #8940: [FLINK-13047][table] Fix 
the Optional.orElse() usage issue in Databas…
URL: https://github.com/apache/flink/pull/8940#discussion_r299256624
 
 

 ##
 File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/catalog/DatabaseCalciteSchema.java
 ##
 @@ -105,8 +105,10 @@ private Table 
convertConnectorTable(ConnectorCatalogTable table) {
 
private Table convertCatalogTable(ObjectPath tablePath, CatalogTable 
table) {
Optional tableFactory = catalog.getTableFactory();
-   TableSource tableSource = tableFactory.map(tf -> 
((TableSourceFactory) tf).createTableSource(tablePath, table))
-   
.orElse(TableFactoryUtil.findAndCreateTableSource(table));
+   TableSource tableSource = tableFactory.map(tf -> 
((TableSourceFactory) tf).createTableSource(tablePath, table)).get();
 
 Review comment:
   by comparing the logic here and `HiveTableFactory.createTableSource()`, I 
found there are duplicated calls on 
`TableFactoryUtil.findAndCreateTableSource(table)`. That made me wonder if any 
table factory implementation shouldn't call the generic table discovery service 
(a.k.a `TableFactoryUtil.findAndCreateTableSource`), and should just create 
tables that is specific to that table factory itself. 
   
   Thus, the `HiveTableFactory.createTableSource` should just be as the 
following:
   
   ```
   @Override
public TableSource createTableSource(ObjectPath tablePath, 
CatalogTable table) {
...
   
if (!isGeneric) {
return createInputFormatTableSource(tablePath, table);
} else {
// return 
TableFactoryUtil.findAndCreateTableSource(table);
return null;
}
}
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] bowenli86 commented on a change in pull request #8940: [FLINK-13047][table] Fix the Optional.orElse() usage issue in Databas…

2019-07-01 Thread GitBox
bowenli86 commented on a change in pull request #8940: [FLINK-13047][table] Fix 
the Optional.orElse() usage issue in Databas…
URL: https://github.com/apache/flink/pull/8940#discussion_r299256624
 
 

 ##
 File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/catalog/DatabaseCalciteSchema.java
 ##
 @@ -105,8 +105,10 @@ private Table 
convertConnectorTable(ConnectorCatalogTable table) {
 
private Table convertCatalogTable(ObjectPath tablePath, CatalogTable 
table) {
Optional tableFactory = catalog.getTableFactory();
-   TableSource tableSource = tableFactory.map(tf -> 
((TableSourceFactory) tf).createTableSource(tablePath, table))
-   
.orElse(TableFactoryUtil.findAndCreateTableSource(table));
+   TableSource tableSource = tableFactory.map(tf -> 
((TableSourceFactory) tf).createTableSource(tablePath, table)).get();
 
 Review comment:
   by comparing the logic here and `HiveTableFactory.createTableSource()`, I 
found there are duplicated calls on 
`TableFactoryUtil.findAndCreateTableSource(table)`. That made me wonder if any 
table factory implementation shouldn't call the generic table discovery service 
(a.k.a `TableFactoryUtil.findAndCreateTableSource`), and should just create 
tables that is specific to that table factory itself. 
   
   Thus, the `HiveTableFactory.createTableSource` should just be as the 
following:
   
   ```
   @Override
public TableSource createTableSource(ObjectPath tablePath, 
CatalogTable table) {
...
   
if (!isGeneric) {
return createInputFormatTableSource(tablePath, table);
} else {
return null;
}
}
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot commented on issue #8941: [FLINK-13048][hive] support decimal in Flink's integration with Hive …

2019-07-01 Thread GitBox
flinkbot commented on issue #8941: [FLINK-13048][hive] support decimal in 
Flink's integration with Hive …
URL: https://github.com/apache/flink/pull/8941#issuecomment-507462215
 
 
   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of 
the review process.
The Bot is tracking the review progress through labels. Labels are applied 
according to the order of the review items. For consensus, approval by a Flink 
committer of PMC member is required Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve one or more aspects (aspects: 
`description`, `consensus`, `architecture` and `quality`)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot approve-until architecture` to approve everything until 
`architecture`
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval you gave earlier
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-13048) support decimal in Flink's integration with Hive user defined functions

2019-07-01 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-13048?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-13048:
---
Labels: pull-request-available  (was: )

> support decimal in Flink's integration with Hive user defined functions
> ---
>
> Key: FLINK-13048
> URL: https://issues.apache.org/jira/browse/FLINK-13048
> Project: Flink
>  Issue Type: Sub-task
>  Components: Connectors / Hive
>Reporter: Bowen Li
>Assignee: Bowen Li
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.9.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] bowenli86 opened a new pull request #8941: [FLINK-13048][hive] support decimal in Flink's integration with Hive …

2019-07-01 Thread GitBox
bowenli86 opened a new pull request #8941: [FLINK-13048][hive] support decimal 
in Flink's integration with Hive …
URL: https://github.com/apache/flink/pull/8941
 
 
   …user defined functions
   
   ## What is the purpose of the change
   
   This PR adds support for decimal in Flink's integration with Hive user 
defined functions.
   
   ## Brief change log
   
   - added decimal conversions in util methods of `HiveInspectors`
   - added unit tests
   
   ## Verifying this change
   
   This change added tests and can be verified as follows:
   
   See newly added unit tests in `HiveSimpleUDFTest`, `HiveGenericUDFTest` and 
`HiveGenericUDAFTest`.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes)
 - If yes, how is the feature documented? (not applicable)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] bowenli86 commented on issue #8941: [FLINK-13048][hive] support decimal in Flink's integration with Hive …

2019-07-01 Thread GitBox
bowenli86 commented on issue #8941: [FLINK-13048][hive] support decimal in 
Flink's integration with Hive …
URL: https://github.com/apache/flink/pull/8941#issuecomment-507461988
 
 
   cc @xuefuz @JingsongLi @lirui-apache @zjuwangg 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Created] (FLINK-13048) support decimal in Flink's integration with Hive user defined functions

2019-07-01 Thread Bowen Li (JIRA)
Bowen Li created FLINK-13048:


 Summary: support decimal in Flink's integration with Hive user 
defined functions
 Key: FLINK-13048
 URL: https://issues.apache.org/jira/browse/FLINK-13048
 Project: Flink
  Issue Type: Sub-task
  Components: Connectors / Hive
Reporter: Bowen Li
Assignee: Bowen Li
 Fix For: 1.9.0






--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] xuefuz commented on a change in pull request #8926: [FLINK-13021][table][hive] unify catalog partition implementations

2019-07-01 Thread GitBox
xuefuz commented on a change in pull request #8926: [FLINK-13021][table][hive] 
unify catalog partition implementations
URL: https://github.com/apache/flink/pull/8926#discussion_r299243763
 
 

 ##
 File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
 ##
 @@ -623,8 +622,10 @@ public void createPartition(ObjectPath tablePath, 
CatalogPartitionSpec partition
checkNotNull(partitionSpec, "CatalogPartitionSpec cannot be 
null");
checkNotNull(partition, "Partition cannot be null");
 
-   if (!(partition instanceof HiveCatalogPartition)) {
-   throw new CatalogException("Currently only supports 
HiveCatalogPartition");
+   boolean isGeneric = 
Boolean.valueOf(partition.getProperties().get(CatalogConfig.IS_GENERIC));
+
+   if (isGeneric) {
+   throw new CatalogException("Currently only supports 
non-generic CatalogPartition");
 
 Review comment:
   nit: The msg doesn't seem to be a complete sentence. Maybe something like 
"only xxx is supported currently". Same as below.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot commented on issue #8940: [FLINK-13047][table] Fix the Optional.orElse() usage issue in Databas…

2019-07-01 Thread GitBox
flinkbot commented on issue #8940: [FLINK-13047][table] Fix the 
Optional.orElse() usage issue in Databas…
URL: https://github.com/apache/flink/pull/8940#issuecomment-507448909
 
 
   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of 
the review process.
The Bot is tracking the review progress through labels. Labels are applied 
according to the order of the review items. For consensus, approval by a Flink 
committer of PMC member is required Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve one or more aspects (aspects: 
`description`, `consensus`, `architecture` and `quality`)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot approve-until architecture` to approve everything until 
`architecture`
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval you gave earlier
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-13047) Fix the Optional.orElse() usage issue in DatabaseCalciteSchema

2019-07-01 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-13047?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-13047:
---
Labels: pull-request-available  (was: )

> Fix the Optional.orElse() usage issue in DatabaseCalciteSchema 
> ---
>
> Key: FLINK-13047
> URL: https://issues.apache.org/jira/browse/FLINK-13047
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API
>Affects Versions: 1.9.0
>Reporter: Xuefu Zhang
>Assignee: Xuefu Zhang
>Priority: Major
>  Labels: pull-request-available
>
> It's found that Optional.orElse() will evaluate the argument first before 
> returning Optional.get(). If the evaluation throws an exception then the call 
> fails even if the Optional object is nonempty. This the case In 
> {{DatabaseCalciteSchema.convertCatalogTable()}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] xuefuz commented on issue #8940: [FLINK-13047][table] Fix the Optional.orElse() usage issue in Databas…

2019-07-01 Thread GitBox
xuefuz commented on issue #8940: [FLINK-13047][table] Fix the Optional.orElse() 
usage issue in Databas…
URL: https://github.com/apache/flink/pull/8940#issuecomment-507448645
 
 
   cc: @bowenli86 @lirui-apache @zjuwangg 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] xuefuz opened a new pull request #8940: [FLINK-13047][table] Fix the Optional.orElse() usage issue in Databas…

2019-07-01 Thread GitBox
xuefuz opened a new pull request #8940: [FLINK-13047][table] Fix the 
Optional.orElse() usage issue in Databas…
URL: https://github.com/apache/flink/pull/8940
 
 
   …eCalciteSchema
   
   
   
   ## What is the purpose of the change
   
   [FLINK-13047][table] Fix the Optional.orElse() usage issue in 
DatabaseCalciteSchem
   
   
   ## Brief change log
   
   
 - Evaluate the expression is orElse() clause only if Optional object is 
empty.
   
   ## Verifying this change
   
   This change is already covered by existing test.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): ( no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: ( no)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (no)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Created] (FLINK-13047) Fix the Optional.orElse() usage issue in DatabaseCalciteSchema

2019-07-01 Thread Xuefu Zhang (JIRA)
Xuefu Zhang created FLINK-13047:
---

 Summary: Fix the Optional.orElse() usage issue in 
DatabaseCalciteSchema 
 Key: FLINK-13047
 URL: https://issues.apache.org/jira/browse/FLINK-13047
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / API
Affects Versions: 1.9.0
Reporter: Xuefu Zhang
Assignee: Xuefu Zhang


It's found that Optional.orElse() will evaluate the argument first before 
returning Optional.get(). If the evaluation throws an exception then the call 
fails even if the Optional object is nonempty. This the case In 
{{DatabaseCalciteSchema.convertCatalogTable()}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-5478) Redis Sink Connector should allow update of command without reinstatiation

2019-07-01 Thread Ton van Bart (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-5478?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16876472#comment-16876472
 ] 

Ton van Bart edited comment on FLINK-5478 at 7/1/19 8:28 PM:
-

I think calling {{getCommandDescription()}} on every call to {{invoke()}} would 
be wrong, as this would mean that also the Redis command to use could change on 
every call, instead of only the additional key. 
My suggestion would be to enhance the RedisMapper with an extra method 
{{getAdditionalKey(T data)}} which returns {{Optional}} and give the 
interface a {{default}} implementation which returns {{Optional.empty()}}. This 
way the interface remains backwards compatible with existing implementations of 
it. 

In {{RedisSink}} the only change would be to change {{this.additionalKey}} with 
{{optAdditionalKey.orElse(this.additionalKey)}} for the applicable methods; 
this also will give backwards compatibility with any existing code that uses 
this class.

I tested this change on my fork and all the tests still pass.


was (Author: tonvanbart):
I think calling {{getCommandDescription()}} on every call to {{invoke()}} would 
be wrong, as this would mean that also the Redis command to use could change on 
every call. My suggestion would be to enhance the RedisMapper with an extra 
method {{getAdditionalKey(T data)}} which returns {{Optional}} and give 
the interface a {{default}} implementation which returns {{Optional.empty()}}. 
This way the interface remains backwards compatible with existing 
implementations of it. 

In {{RedisSink}} the only change would be to change {{this.additionalKey}} with 
{{optAdditionalKey.orElse(this.additionalKey)}} for the applicable methods; 
this also will give backwards compatibility with any existing code that uses 
this class.

I tested this change on my fork and all the tests still pass.

> Redis Sink Connector should allow update of command without reinstatiation
> --
>
> Key: FLINK-5478
> URL: https://issues.apache.org/jira/browse/FLINK-5478
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Common
>Affects Versions: 1.1.4
>Reporter: Atharva Inamdar
>Priority: Major
>
> `getCommandDescription()` gets called when RedisSink is instantiated. This 
> happens only once and thus doesn't allow the command to be updated during run 
> time.
> Use Case:
> As a dev I want to store some data by day. So each key will have some date 
> specified. this will change over course of time. for example: 
> `counts_for_148426560` for 2017-01-13. This is not limited to any 
> particular command. 
> connector: 
> https://github.com/apache/bahir-flink/blob/master/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisSink.java#L114
> I wish `getCommandDescription()` could be called in `invoke()` so that the 
> key can be updated without having to restart.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-5478) Redis Sink Connector should allow update of command without reinstatiation

2019-07-01 Thread Ton van Bart (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-5478?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16876472#comment-16876472
 ] 

Ton van Bart edited comment on FLINK-5478 at 7/1/19 8:27 PM:
-

I think calling {{getCommandDescription()}} on every call to {{invoke()}} would 
be wrong, as this would mean that also the Redis command to use could change on 
every call. My suggestion would be to enhance the RedisMapper with an extra 
method {{getAdditionalKey(T data)}} which returns {{Optional}} and give 
the interface a {{default}} implementation which returns {{Optional.empty()}}. 
This way the interface remains backwards compatible with existing 
implementations of it. 

In {{RedisSink}} the only change would be to change {{this.additionalKey}} with 
{{optAdditionalKey.orElse(this.additionalKey)}} for the applicable methods; 
this also will give backwards compatibility with any existing code that uses 
this class.

I tested this change on my fork and all the tests still pass.


was (Author: tonvanbart):
I think calling {{getCommandDescription()}} on every call to {{invoke()}} would 
be wrong, as this would mean that also the Redis command to use could change on 
every call. My suggestion would be to enhance the RedisMapper with an extra 
method {{getAdditionalKey(T data)}} which returns {{Optional}} and give 
the interface a {{default}} implementation which returns {{Optional.empty()}}. 
This way the interface remains backwards compatible with existing 
implementations of it. 

In {{RedisSink}} the only change would be to change {{this.additionalKey}} with 
{{optAdditionalKey.orElse(this.additionalKey)}} for the applicable methods; 
this also will give backwards compatibility with any existing code that uses 
this class.

> Redis Sink Connector should allow update of command without reinstatiation
> --
>
> Key: FLINK-5478
> URL: https://issues.apache.org/jira/browse/FLINK-5478
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Common
>Affects Versions: 1.1.4
>Reporter: Atharva Inamdar
>Priority: Major
>
> `getCommandDescription()` gets called when RedisSink is instantiated. This 
> happens only once and thus doesn't allow the command to be updated during run 
> time.
> Use Case:
> As a dev I want to store some data by day. So each key will have some date 
> specified. this will change over course of time. for example: 
> `counts_for_148426560` for 2017-01-13. This is not limited to any 
> particular command. 
> connector: 
> https://github.com/apache/bahir-flink/blob/master/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisSink.java#L114
> I wish `getCommandDescription()` could be called in `invoke()` so that the 
> key can be updated without having to restart.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-5478) Redis Sink Connector should allow update of command without reinstatiation

2019-07-01 Thread Ton van Bart (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-5478?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16876472#comment-16876472
 ] 

Ton van Bart commented on FLINK-5478:
-

I think calling {{getCommandDescription()}} on every call to {{invoke()}} would 
be wrong, as this would mean that also the Redis command to use could change on 
every call. My suggestion would be to enhance the RedisMapper with an extra 
method {{getAdditionalKey(T data)}} which returns {{Optional}} and give 
the interface a {{default}} implementation which returns {{Optional.empty()}}. 
This way the interface remains backwards compatible with existing 
implementations of it. 

In {{RedisSink}} the only change would be to change {{this.additionalKey}} with 
{{optAdditionalKey.orElse(this.additionalKey)}} for the applicable methods; 
this also will give backwards compatibility with any existing code that uses 
this class.

> Redis Sink Connector should allow update of command without reinstatiation
> --
>
> Key: FLINK-5478
> URL: https://issues.apache.org/jira/browse/FLINK-5478
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Common
>Affects Versions: 1.1.4
>Reporter: Atharva Inamdar
>Priority: Major
>
> `getCommandDescription()` gets called when RedisSink is instantiated. This 
> happens only once and thus doesn't allow the command to be updated during run 
> time.
> Use Case:
> As a dev I want to store some data by day. So each key will have some date 
> specified. this will change over course of time. for example: 
> `counts_for_148426560` for 2017-01-13. This is not limited to any 
> particular command. 
> connector: 
> https://github.com/apache/bahir-flink/blob/master/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisSink.java#L114
> I wish `getCommandDescription()` could be called in `invoke()` so that the 
> key can be updated without having to restart.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] flinkbot commented on issue #8939: [FLINK-13046][hive] rename hive-site-path to hive-conf-dir to be cons…

2019-07-01 Thread GitBox
flinkbot commented on issue #8939: [FLINK-13046][hive] rename hive-site-path to 
hive-conf-dir to be cons…
URL: https://github.com/apache/flink/pull/8939#issuecomment-507400400
 
 
   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of 
the review process.
The Bot is tracking the review progress through labels. Labels are applied 
according to the order of the review items. For consensus, approval by a Flink 
committer of PMC member is required Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve one or more aspects (aspects: 
`description`, `consensus`, `architecture` and `quality`)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot approve-until architecture` to approve everything until 
`architecture`
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval you gave earlier
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-13046) rename hive-site-path to hive-conf-dir to be consistent with standard name in Hive

2019-07-01 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-13046?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-13046:
---
Labels: pull-request-available  (was: )

> rename hive-site-path to hive-conf-dir to be consistent with standard name in 
> Hive
> --
>
> Key: FLINK-13046
> URL: https://issues.apache.org/jira/browse/FLINK-13046
> Project: Flink
>  Issue Type: Sub-task
>  Components: Connectors / Hive
>Reporter: Bowen Li
>Assignee: Bowen Li
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.9.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] bowenli86 commented on issue #8939: [FLINK-13046][hive] rename hive-site-path to hive-conf-dir to be cons…

2019-07-01 Thread GitBox
bowenli86 commented on issue #8939: [FLINK-13046][hive] rename hive-site-path 
to hive-conf-dir to be cons…
URL: https://github.com/apache/flink/pull/8939#issuecomment-507400080
 
 
   cc @xuefuz @lirui-apache @zjuwangg 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] bowenli86 opened a new pull request #8939: [FLINK-13046][hive] rename hive-site-path to hive-conf-dir to be cons…

2019-07-01 Thread GitBox
bowenli86 opened a new pull request #8939: [FLINK-13046][hive] rename 
hive-site-path to hive-conf-dir to be cons…
URL: https://github.com/apache/flink/pull/8939
 
 
   …istent with standard name in Hive
   
   ## What is the purpose of the change
   
   This PR renames the SQL CLI config key for HiveCatalog from `hive-site-path` 
to `hive-conf-dir` which is consistent with standard Hive conf key name.
   
   ## Brief change log
   
   - renamed the config from `hive-site-path` to `hive-conf-dir`
   - updated a few method names and yaml files of unit tests 
   - updated documentation
   
   ## Verifying this change
   
   This change is already covered by existing tests, such as 
*ExecutionContextTest*.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (no)
 - If yes, how is the feature documented? (docs)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] bowenli86 commented on issue #8926: [FLINK-13021][table][hive] unify catalog partition implementations

2019-07-01 Thread GitBox
bowenli86 commented on issue #8926: [FLINK-13021][table][hive] unify catalog 
partition implementations
URL: https://github.com/apache/flink/pull/8926#issuecomment-507395104
 
 
   cc @xuefuz @lirui-apache @zjuwangg 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Created] (FLINK-13046) rename hive-site-path to hive-conf-dir to be consistent with standard name in Hive

2019-07-01 Thread Bowen Li (JIRA)
Bowen Li created FLINK-13046:


 Summary: rename hive-site-path to hive-conf-dir to be consistent 
with standard name in Hive
 Key: FLINK-13046
 URL: https://issues.apache.org/jira/browse/FLINK-13046
 Project: Flink
  Issue Type: Sub-task
  Components: Connectors / Hive
Reporter: Bowen Li
Assignee: Bowen Li
 Fix For: 1.9.0






--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] flinkbot edited a comment on issue #8920: [FLINK-13024][table] integrate FunctionCatalog with CatalogManager

2019-07-01 Thread GitBox
flinkbot edited a comment on issue #8920: [FLINK-13024][table] integrate 
FunctionCatalog with CatalogManager
URL: https://github.com/apache/flink/pull/8920#issuecomment-506515812
 
 
   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❗ 3. Needs [attention] from.
   - Needs attention by @dawidwys [committer], @twalthr [PMC]
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of 
the review process.
The Bot is tracking the review progress through labels. Labels are applied 
according to the order of the review items. For consensus, approval by a Flink 
committer of PMC member is required Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve one or more aspects (aspects: 
`description`, `consensus`, `architecture` and `quality`)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot approve-until architecture` to approve everything until 
`architecture`
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval you gave earlier
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] bowenli86 commented on issue #8920: [FLINK-13024][table] integrate FunctionCatalog with CatalogManager

2019-07-01 Thread GitBox
bowenli86 commented on issue #8920: [FLINK-13024][table] integrate 
FunctionCatalog with CatalogManager
URL: https://github.com/apache/flink/pull/8920#issuecomment-507386500
 
 
   @flinkbot attention @twalthr @dawidwys 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #8911: [FLINK-12995][hive] Add Hive-1.2.1 build to Travis

2019-07-01 Thread GitBox
flinkbot edited a comment on issue #8911: [FLINK-12995][hive] Add Hive-1.2.1 
build to Travis
URL: https://github.com/apache/flink/pull/8911#issuecomment-506262493
 
 
   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❗ 3. Needs [attention] from.
   - Needs attention by @zentol [PMC]
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of 
the review process.
The Bot is tracking the review progress through labels. Labels are applied 
according to the order of the review items. For consensus, approval by a Flink 
committer of PMC member is required Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve one or more aspects (aspects: 
`description`, `consensus`, `architecture` and `quality`)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot approve-until architecture` to approve everything until 
`architecture`
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval you gave earlier
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


  1   2   3   4   >