[GitHub] [flink] flinkbot edited a comment on issue #10284: [FLINK-14735][scheduler] Improve scheduling of all-to-all partitions with ALL input constraint for legacy scheduler

2019-11-21 Thread GitBox
flinkbot edited a comment on issue #10284: [FLINK-14735][scheduler] Improve 
scheduling of all-to-all partitions with ALL input constraint for legacy 
scheduler
URL: https://github.com/apache/flink/pull/10284#issuecomment-557162273
 
 
   
   ## CI report:
   
   * 55bb7f0c227441d894acdfdea48610e4ccdbf12c : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/137619209)
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #10289: [FLINK-14924][table sql / api] CsvTableSource can not config empty column as null

2019-11-21 Thread GitBox
flinkbot edited a comment on issue #10289: [FLINK-14924][table sql / api] 
CsvTableSource can not config empty column as null
URL: https://github.com/apache/flink/pull/10289#issuecomment-557424125
 
 
   
   ## CI report:
   
   * e668b7fee390243db8312cc7a4ce3b71fb6f15a3 : PENDING 
[Build](https://travis-ci.com/flink-ci/flink/builds/137717898)
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-14924) CsvTableSource can not config empty column as null

2019-11-21 Thread Jingsong Lee (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14924?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16979943#comment-16979943
 ] 

Jingsong Lee commented on FLINK-14924:
--

FYI: `CsvRowDeserializationSchema.Builder.setNullLiteral`. And 
`CsvValidator.FORMAT_NULL_LITERAL`.

> CsvTableSource can not config empty column as null
> --
>
> Key: FLINK-14924
> URL: https://issues.apache.org/jira/browse/FLINK-14924
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Affects Versions: 1.9.1
>Reporter: Leonard Xu
>Assignee: Leonard Xu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.10.0
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> CsvTableSource can not config empty column as null, 
> and convert  to empty string by default.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] docete commented on a change in pull request #10268: [Flink-14599][table-planner-blink] Support precision of TimestampType in blink planner

2019-11-21 Thread GitBox
docete commented on a change in pull request #10268: 
[Flink-14599][table-planner-blink] Support precision of TimestampType in blink 
planner
URL: https://github.com/apache/flink/pull/10268#discussion_r349468600
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/calls/ScalarOperatorGens.scala
 ##
 @@ -191,12 +191,24 @@ object ScalarOperatorGens {
 
   case (TIMESTAMP_WITHOUT_TIME_ZONE, INTERVAL_DAY_TIME) =>
 generateOperatorIfNotNull(ctx, left.resultType, left, right) {
-  (l, r) => s"$l $op $r"
+  (l, r) => {
+val leftTerm = s"$l.getMillisecond()"
+val nanoTerm = s"$l.getNanoOfMillisecond()"
+s"$SQL_TIMESTAMP.fromEpochMillis($leftTerm $op $r, $nanoTerm)"
 
 Review comment:
   Not need now. Can add such things if we support more datetime arithmetic in 
the future.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #10283: [FLINK-14735][scheduler] Improve scheduling of all-to-all partitions with ALL input constraint for legacy scheduler

2019-11-21 Thread GitBox
flinkbot edited a comment on issue #10283: [FLINK-14735][scheduler] Improve 
scheduling of all-to-all partitions with ALL input constraint for legacy 
scheduler
URL: https://github.com/apache/flink/pull/10283#issuecomment-55716
 
 
   
   ## CI report:
   
   * 9bc37b6f7ab760f29d28c3cc9763a19de17fb484 : PENDING 
[Build](https://travis-ci.com/flink-ci/flink/builds/137616270)
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-14924) CsvTableSource can not config empty column as null

2019-11-21 Thread Jingsong Lee (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14924?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16979940#comment-16979940
 ] 

Jingsong Lee commented on FLINK-14924:
--

Just curious:

I'm not sure if we should add `this empty column as null`.

Is a more standard approach is `null literal`? Can `null literal` meet this 
requirement?

If it is right, should we add `null literal` and say we only support empty 
string now until we switch to new standard CSV source?

> CsvTableSource can not config empty column as null
> --
>
> Key: FLINK-14924
> URL: https://issues.apache.org/jira/browse/FLINK-14924
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Affects Versions: 1.9.1
>Reporter: Leonard Xu
>Assignee: Leonard Xu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.10.0
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> CsvTableSource can not config empty column as null, 
> and convert  to empty string by default.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] liupc edited a comment on issue #10269: [FLINK-14876]Putting xercesImpl related classes into alwaysParentFirstLoaderPatterns to avoid conflicts

2019-11-21 Thread GitBox
liupc edited a comment on issue #10269: [FLINK-14876]Putting xercesImpl related 
classes into alwaysParentFirstLoaderPatterns to avoid conflicts
URL: https://github.com/apache/flink/pull/10269#issuecomment-557429329
 
 
   hi @aljoscha ,
   The hadoop version of our cluster is 2.6, so I changed the `hadoop.version` 
in the `flink-shaded-hadoop-2` project, and compiled it and then use some 
classes of it in my app. Because the `xerces` package was introduced into 
hadoop since 2.6.0(https://issues.apache.org/jira/browse/HDFS-4629), the 
classes of it was also packaged into my app jar, thus causing the failure. 
   I think if I don't depends on `flink-shaded-hadoop-2` but use hadoop deps 
directly there are risks that this `xercesImpl` would be introduced.
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] yanghua commented on issue #10288: [FLINK-14765] Remove STATE_UPDATER in Execution

2019-11-21 Thread GitBox
yanghua commented on issue #10288: [FLINK-14765] Remove STATE_UPDATER in 
Execution
URL: https://github.com/apache/flink/pull/10288#issuecomment-557429438
 
 
   cc @GJL 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] docete commented on a change in pull request #10268: [Flink-14599][table-planner-blink] Support precision of TimestampType in blink planner

2019-11-21 Thread GitBox
docete commented on a change in pull request #10268: 
[Flink-14599][table-planner-blink] Support precision of TimestampType in blink 
planner
URL: https://github.com/apache/flink/pull/10268#discussion_r349466855
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/calls/MethodCallGen.scala
 ##
 @@ -34,13 +33,14 @@ class MethodCallGen(method: Method) extends CallGenerator {
   returnType: LogicalType): GeneratedExpression = {
 generateCallIfArgsNotNull(ctx, returnType, operands, 
!method.getReturnType.isPrimitive) {
   originalTerms => {
-val terms = originalTerms.zip(method.getParameterTypes).map { case 
(term, clazz) =>
-  // convert the BinaryString parameter to String if the method 
parameter accept String
-  if (clazz == classOf[String]) {
-s"$term.toString()"
-  } else {
-term
-  }
+val terms = 
originalTerms.zipWithIndex.zip(method.getParameterTypes).map {
 
 Review comment:
   incorrect intermediate state. the zipWithIndex is introduced to fix the 
original TO_TIMESTAMP. I will remove it.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] liupc commented on issue #10269: [FLINK-14876]Putting xercesImpl related classes into alwaysParentFirstLoaderPatterns to avoid conflicts

2019-11-21 Thread GitBox
liupc commented on issue #10269: [FLINK-14876]Putting xercesImpl related 
classes into alwaysParentFirstLoaderPatterns to avoid conflicts
URL: https://github.com/apache/flink/pull/10269#issuecomment-557429329
 
 
   hi @aljoscha ,
   The hadoop version of our cluster is 2.6, so I changed the `hadoop.version` 
in the flink-shaded-hadoop-2 project, and compiled it and then use some classes 
of it in my app. Because the xerces package was introduced into hadoop since 
2.6.0(https://issues.apache.org/jira/browse/HDFS-4629), the classes of it was 
also packaged into my app jar, thus causing the failure. 
   I think if I don't depends on flink-shaded-hadoop-2 but use hadoop deps 
directly there are risks that this `xercesImpl` would be introduced.
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Assigned] (FLINK-14924) CsvTableSource can not config empty column as null

2019-11-21 Thread Kurt Young (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-14924?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kurt Young reassigned FLINK-14924:
--

Assignee: Leonard Xu

> CsvTableSource can not config empty column as null
> --
>
> Key: FLINK-14924
> URL: https://issues.apache.org/jira/browse/FLINK-14924
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Affects Versions: 1.9.1
>Reporter: Leonard Xu
>Assignee: Leonard Xu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.10.0
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> CsvTableSource can not config empty column as null, 
> and convert  to empty string by default.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-14924) CsvTableSource can not config empty column as null

2019-11-21 Thread Kurt Young (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14924?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16979937#comment-16979937
 ] 

Kurt Young commented on FLINK-14924:


Assigned to you [~Leonard Xu]

> CsvTableSource can not config empty column as null
> --
>
> Key: FLINK-14924
> URL: https://issues.apache.org/jira/browse/FLINK-14924
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Affects Versions: 1.9.1
>Reporter: Leonard Xu
>Assignee: Leonard Xu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.10.0
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> CsvTableSource can not config empty column as null, 
> and convert  to empty string by default.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] KurtYoung commented on a change in pull request #10289: [FLINK-14924][table sql / api] CsvTableSource can not config empty column as null

2019-11-21 Thread GitBox
KurtYoung commented on a change in pull request #10289: [FLINK-14924][table sql 
/ api] CsvTableSource can not config empty column as null
URL: https://github.com/apache/flink/pull/10289#discussion_r349465056
 
 

 ##
 File path: 
flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sources/CsvTableSource.java
 ##
 @@ -129,8 +163,43 @@ public CsvTableSource(
boolean ignoreFirstLine,
String ignoreComments,
boolean lenient) {
-   this(new CsvInputFormatConfig(path, fieldNames, fieldTypes, 
selectedFields,
-   fieldDelim, lineDelim, quoteCharacter, ignoreFirstLine, 
ignoreComments, lenient));
+   this(path, fieldNames, fieldTypes, selectedFields,
+   fieldDelim, lineDelim, quoteCharacter, ignoreFirstLine,
+   ignoreComments, lenient, false);
+   }
+
+   /**
+* A {@link InputFormatTableSource} and {@link LookupableTableSource} 
for simple CSV files with
+* a (logically) unlimited number of fields.
+*
+* @param path  The path to the CSV file.
+* @param fieldNamesThe names of the table fields.
+* @param fieldTypesThe types of the table fields.
+* @param selectedFieldsThe fields which will be read and 
returned by the table source. If
+*  None, all fields are returned.
+* @param fieldDelimThe field delimiter, "," by default.
+* @param lineDelim The row delimiter, "\n" by default.
+* @param quoteCharacterAn optional quote character for String 
values, null by default.
+* @param ignoreFirstLine   Flag to ignore the first line, false by 
default.
+* @param ignoreCommentsAn optional prefix to indicate 
comments, null by default.
+* @param lenient   Flag to skip records with parse error 
instead to fail, false by
+*  default.
+* @param emptyColumnAsNull Flag to treat empty column as null.
+*/
+   public CsvTableSource(
+   String path,
 
 Review comment:
   nit: indent 2 tabs


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] KurtYoung commented on a change in pull request #10289: [FLINK-14924][table sql / api] CsvTableSource can not config empty column as null

2019-11-21 Thread GitBox
KurtYoung commented on a change in pull request #10289: [FLINK-14924][table sql 
/ api] CsvTableSource can not config empty column as null
URL: https://github.com/apache/flink/pull/10289#discussion_r349464983
 
 

 ##
 File path: 
flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sources/CsvTableSource.java
 ##
 @@ -101,6 +101,40 @@ public CsvTableSource(
quoteCharacter, ignoreFirstLine, ignoreComments, 
lenient);
}
 
+   /**
+* A {@link InputFormatTableSource} and {@link LookupableTableSource} 
for simple CSV files with
+* a (logically) unlimited number of fields.
+*
+* @param path  The path to the CSV file.
+* @param fieldNamesThe names of the table fields.
+* @param fieldTypesThe types of the table fields.
+* @param fieldDelimThe field delimiter, "," by default.
+* @param lineDelim The row delimiter, "\n" by default.
+* @param quoteCharacterAn optional quote character for String 
values, null by default.
+* @param ignoreFirstLine   Flag to ignore the first line, false by 
default.
+* @param ignoreCommentsAn optional prefix to indicate 
comments, null by default.
+* @param lenient   Flag to skip records with parse error 
instead to fail, false by
+*  default.
+* @param emptyColumnAsNull Flag to treat empty column as null.
+*/
+   public CsvTableSource(
 
 Review comment:
   How about we deprecating all existing constructors and to encourage users 
using `Builder` instead? We can have only one unified private constructor then.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zjuwangg commented on a change in pull request #10272: [FLINK-14878]Support `use catalog` through sqlUpdate() method in TableEnvironment

2019-11-21 Thread GitBox
zjuwangg commented on a change in pull request #10272: [FLINK-14878]Support 
`use  catalog` through sqlUpdate() method in TableEnvironment
URL: https://github.com/apache/flink/pull/10272#discussion_r348873978
 
 

 ##
 File path: 
flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlUseCatalog.java
 ##
 @@ -35,7 +35,7 @@
  */
 public class SqlUseCatalog extends SqlCall {
 
-   public static final SqlSpecialOperator OPERATOR = new 
SqlSpecialOperator("USE CATALOG", SqlKind.OTHER);
+   public static final SqlSpecialOperator OPERATOR = new 
SqlSpecialOperator("USE CATALOG", SqlKind.OTHER_DDL);
 
 Review comment:
   Both `validate(sqlNode: SqlNode)` methods in `FlinkPlannerImpl` in 
flink-planner/blink-planner will validate row type,  SqlKind.OTHER kind will 
fail the validation.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #10170: [FLINK-14722][hadoop] Optimize mapred.HadoopInputSplit to not serialize conf when split is not configurable

2019-11-21 Thread GitBox
flinkbot edited a comment on issue #10170: [FLINK-14722][hadoop] Optimize 
mapred.HadoopInputSplit to not serialize conf when split is not configurable
URL: https://github.com/apache/flink/pull/10170#issuecomment-553318094
 
 
   
   ## CI report:
   
   * 7ded222adb69e94dc572de2e3aa58d63e619f557 : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/136289243)
   * e214405036b88521e20fd464f11df9be487251d7 : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/137706181)
   * 6810a4293a5a9529c8e8b374b6aa149396830c55 : UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-14924) CsvTableSource can not config empty column as null

2019-11-21 Thread Leonard Xu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14924?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16979930#comment-16979930
 ] 

Leonard Xu commented on FLINK-14924:


[~ykt836] Thanks for your tips, could you help assign this to me?

 I submit a PR for this issue,  this issue is the root cause of  
FLINK-14886,FLLINK-14895,FLINK-14900. 
Sorry for rush bringing these issues, I'll be more careful when report an issue.

> CsvTableSource can not config empty column as null
> --
>
> Key: FLINK-14924
> URL: https://issues.apache.org/jira/browse/FLINK-14924
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Affects Versions: 1.9.1
>Reporter: Leonard Xu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.10.0
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> CsvTableSource can not config empty column as null, 
> and convert  to empty string by default.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] flinkbot edited a comment on issue #10287: [FLINK-14915][runtime] Remove the unnecessary param JobGraph from SchedulingStrategyFactory#createInstance

2019-11-21 Thread GitBox
flinkbot edited a comment on issue #10287: [FLINK-14915][runtime] Remove the 
unnecessary param JobGraph from SchedulingStrategyFactory#createInstance
URL: https://github.com/apache/flink/pull/10287#issuecomment-557403940
 
 
   
   ## CI report:
   
   * 91a9712e38f270006e6577e2976deb9993bb8243 : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/137712205)
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] docete commented on a change in pull request #10268: [Flink-14599][table-planner-blink] Support precision of TimestampType in blink planner

2019-11-21 Thread GitBox
docete commented on a change in pull request #10268: 
[Flink-14599][table-planner-blink] Support precision of TimestampType in blink 
planner
URL: https://github.com/apache/flink/pull/10268#discussion_r349462103
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/sort/SortCodeGenerator.scala
 ##
 @@ -437,7 +439,9 @@ class SortCodeGenerator(
 t.getTypeRoot match {
   case _ if PlannerTypeUtils.isPrimitive(t) => true
   case VARCHAR | CHAR | VARBINARY | BINARY |
-   DATE | TIME_WITHOUT_TIME_ZONE | TIMESTAMP_WITHOUT_TIME_ZONE => true
+   DATE | TIME_WITHOUT_TIME_ZONE => true
+  case TIMESTAMP_WITHOUT_TIME_ZONE =>
 
 Review comment:
   Added TODO comments


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot commented on issue #10289: [FLINK-14924][table sql / api] CsvTableSource can not config empty column as null

2019-11-21 Thread GitBox
flinkbot commented on issue #10289: [FLINK-14924][table sql / api] 
CsvTableSource can not config empty column as null
URL: https://github.com/apache/flink/pull/10289#issuecomment-557424125
 
 
   
   ## CI report:
   
   * e668b7fee390243db8312cc7a4ce3b71fb6f15a3 : UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #10288: [FLINK-14765] Remove STATE_UPDATER in Execution

2019-11-21 Thread GitBox
flinkbot edited a comment on issue #10288: [FLINK-14765] Remove STATE_UPDATER 
in Execution
URL: https://github.com/apache/flink/pull/10288#issuecomment-557403970
 
 
   
   ## CI report:
   
   * 018cc229676d6c6634ce4113342f46b42eb24416 : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/137712216)
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #10283: [FLINK-14735][scheduler] Improve scheduling of all-to-all partitions with ALL input constraint for legacy scheduler

2019-11-21 Thread GitBox
flinkbot edited a comment on issue #10283: [FLINK-14735][scheduler] Improve 
scheduling of all-to-all partitions with ALL input constraint for legacy 
scheduler
URL: https://github.com/apache/flink/pull/10283#issuecomment-55716
 
 
   
   ## CI report:
   
   * 9bc37b6f7ab760f29d28c3cc9763a19de17fb484 : PENDING 
[Build](https://travis-ci.com/flink-ci/flink/builds/137616270)
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhuzhurk commented on issue #10283: [FLINK-14735][scheduler] Improve scheduling of all-to-all partitions with ALL input constraint for legacy scheduler

2019-11-21 Thread GitBox
zhuzhurk commented on issue #10283: [FLINK-14735][scheduler] Improve scheduling 
of all-to-all partitions with ALL input constraint for legacy scheduler
URL: https://github.com/apache/flink/pull/10283#issuecomment-557423556
 
 
   @flinkbot run travis


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] JingsongLi commented on a change in pull request #10268: [Flink-14599][table-planner-blink] Support precision of TimestampType in blink planner

2019-11-21 Thread GitBox
JingsongLi commented on a change in pull request #10268: 
[Flink-14599][table-planner-blink] Support precision of TimestampType in blink 
planner
URL: https://github.com/apache/flink/pull/10268#discussion_r349461114
 
 

 ##
 File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/match/RowtimeProcessFunction.java
 ##
 @@ -44,7 +44,7 @@ public RowtimeProcessFunction(int rowtimeIdx, 
TypeInformation returnTyp
 
@Override
public void processElement(BaseRow value, Context ctx, 
Collector out) throws Exception {
-   long timestamp = value.getLong(rowtimeIdx);
+   long timestamp = value.getTimestamp(rowtimeIdx, 
3).getMillisecond();
 
 Review comment:
   Looks like it is very easy to pass the precision.
   So I would suggest like Jark, just pass it to avoid more hardcode.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] docete commented on a change in pull request #10268: [Flink-14599][table-planner-blink] Support precision of TimestampType in blink planner

2019-11-21 Thread GitBox
docete commented on a change in pull request #10268: 
[Flink-14599][table-planner-blink] Support precision of TimestampType in blink 
planner
URL: https://github.com/apache/flink/pull/10268#discussion_r349460825
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java
 ##
 @@ -644,10 +644,13 @@ public SqlMonotonicity 
getMonotonicity(SqlOperatorBinding call) {
OperandTypes.family(SqlTypeFamily.STRING, 
SqlTypeFamily.INTEGER),
SqlFunctionCategory.STRING);
 
+   // TODO: the return type of TO_TIMESTAMP should be TIMESTAMP(9)
 
 Review comment:
   @wuchong exactly.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] docete commented on a change in pull request #10268: [Flink-14599][table-planner-blink] Support precision of TimestampType in blink planner

2019-11-21 Thread GitBox
docete commented on a change in pull request #10268: 
[Flink-14599][table-planner-blink] Support precision of TimestampType in blink 
planner
URL: https://github.com/apache/flink/pull/10268#discussion_r349460526
 
 

 ##
 File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/types/TypeInfoDataTypeConverter.java
 ##
 @@ -98,6 +105,20 @@ private static void addDefaultTypeInfo(Class clazz, 
TypeInformation typeIn
}
LogicalType logicalType = fromDataTypeToLogicalType(dataType);
switch (logicalType.getTypeRoot()) {
+   case TIMESTAMP_WITHOUT_TIME_ZONE:
+   TimestampType timestampType = (TimestampType) 
logicalType;
+   int precision = timestampType.getPrecision();
+   if (timestampType.getKind() == 
TimestampKind.REGULAR) {
+   return clazz == SqlTimestamp.class ?
+   new 
SqlTimestampTypeInfo(precision) :
+   (clazz == LocalDateTime.class ?
+   ((3 == precision) ?
 
 Review comment:
   `Types.LOCAL_DATE_TIME` and `Types.SQL_TIMESTAMP` is special in our system. 
We always convert them to `DataTypes.TIMESTAMP(3)` and back(the conversion 
class is different). @wuchong  Yes, it is for backward compatibility.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] JingsongLi commented on a change in pull request #10268: [Flink-14599][table-planner-blink] Support precision of TimestampType in blink planner

2019-11-21 Thread GitBox
JingsongLi commented on a change in pull request #10268: 
[Flink-14599][table-planner-blink] Support precision of TimestampType in blink 
planner
URL: https://github.com/apache/flink/pull/10268#discussion_r349460592
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/CodeGenUtils.scala
 ##
 @@ -590,8 +597,7 @@ object CodeGenUtils {
 case DOUBLE => s"$arrayTerm.setNullDouble($index)"
 case TIME_WITHOUT_TIME_ZONE => s"$arrayTerm.setNullInt($index)"
 case DATE => s"$arrayTerm.setNullInt($index)"
-case TIMESTAMP_WITHOUT_TIME_ZONE |
- TIMESTAMP_WITH_LOCAL_TIME_ZONE => s"$arrayTerm.setNullLong($index)"
+case TIMESTAMP_WITH_LOCAL_TIME_ZONE => s"$arrayTerm.setNullLong($index)"
 
 Review comment:
   If you take a look to below codes, it will go to `case _` and `setNullLong`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot commented on issue #10289: [FLINK-14924][table sql / api] CsvTableSource can not config empty column as null

2019-11-21 Thread GitBox
flinkbot commented on issue #10289: [FLINK-14924][table sql / api] 
CsvTableSource can not config empty column as null
URL: https://github.com/apache/flink/pull/10289#issuecomment-557422149
 
 
   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Automated Checks
   Last check on commit e668b7fee390243db8312cc7a4ce3b71fb6f15a3 (Fri Nov 22 
07:23:31 UTC 2019)
   
   **Warnings:**
* No documentation files were touched! Remember to keep the Flink docs up 
to date!
* **This pull request references an unassigned [Jira 
ticket](https://issues.apache.org/jira/browse/FLINK-14924).** According to the 
[code contribution 
guide](https://flink.apache.org/contributing/contribute-code.html), tickets 
need to be assigned before starting with the implementation work.
   
   
   Mention the bot in a comment to re-run the automated checks.
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full 
explanation of the review process.
The Bot is tracking the review progress through labels. Labels are applied 
according to the order of the review items. For consensus, approval by a Flink 
committer of PMC member is required Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve one or more aspects (aspects: 
`description`, `consensus`, `architecture` and `quality`)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot approve-until architecture` to approve everything until 
`architecture`
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval you gave earlier
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-14924) CsvTableSource can not config empty column as null

2019-11-21 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-14924?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-14924:
---
Labels: pull-request-available  (was: )

> CsvTableSource can not config empty column as null
> --
>
> Key: FLINK-14924
> URL: https://issues.apache.org/jira/browse/FLINK-14924
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Affects Versions: 1.9.1
>Reporter: Leonard Xu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.10.0
>
>
> CsvTableSource can not config empty column as null, 
> and convert  to empty string by default.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] leonardBang opened a new pull request #10289: [FLINK-14924][table sql / api] CsvTableSource can not config empty column as null

2019-11-21 Thread GitBox
leonardBang opened a new pull request #10289: [FLINK-14924][table sql / api] 
CsvTableSource can not config empty column as null
URL: https://github.com/apache/flink/pull/10289
 
 
   CsvTableSource can not config empty column as null.
   
   
   ## What is the purpose of the change
   
   *This pull request add option of treating empty column as null for 
CsvTableSource.*
   
   
   ## Brief change log
 - *update file org.apache.flink.table.sources.CsvTableSource.java*
   
   ## Verifying this change
   Add ITCase to to test CsvTableSource:
 - *org.apache.flink.table.runtime.batch.sql.TableSourceITCase.scala*
 - *org.apache.flink.table.runtime.utils.CommonTestData.scala*
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): ( no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
 - The S3 file system connector: (no)
   
   ## Documentation
   
- Does this pull request introduce a new feature? (no)
- If yes, how is the feature documented? (not applicable / docs / JavaDocs 
/ not documented)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] JingsongLi commented on a change in pull request #10170: [FLINK-14722][hadoop] Optimize mapred.HadoopInputSplit to not serialize conf when split is not configurable

2019-11-21 Thread GitBox
JingsongLi commented on a change in pull request #10170: [FLINK-14722][hadoop] 
Optimize mapred.HadoopInputSplit to not serialize conf when split is not 
configurable
URL: https://github.com/apache/flink/pull/10170#discussion_r349459373
 
 

 ##
 File path: 
flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/hadoop/mapred/wrapper/HadoopInputSplit.java
 ##
 @@ -103,23 +109,28 @@ private void readObject(ObjectInputStream in) throws 
IOException, ClassNotFoundE
// read the parent fields and the final fields
in.defaultReadObject();
 
-   // the job conf knows how to deserialize itself
-   jobConf = new JobConf();
-   jobConf.readFields(in);
-
try {
hadoopInputSplit = 
(org.apache.hadoop.mapred.InputSplit) WritableFactories.newInstance(splitType);
-   }
-   catch (Exception e) {
+   } catch (Exception e) {
throw new RuntimeException("Unable to instantiate 
Hadoop InputSplit", e);
}
 
-   if (hadoopInputSplit instanceof Configurable) {
-   ((Configurable) hadoopInputSplit).setConf(this.jobConf);
-   }
-   else if (hadoopInputSplit instanceof JobConfigurable) {
-   ((JobConfigurable) 
hadoopInputSplit).configure(this.jobConf);
+   if (isConfigurable(hadoopInputSplit)) {
+   // the job conf knows how to deserialize itself
+   jobConf = new JobConf();
+   jobConf.readFields(in);
+
+   if (hadoopInputSplit instanceof Configurable) {
+   ((Configurable) 
hadoopInputSplit).setConf(this.jobConf);
 
 Review comment:
   After I took a look to 
`org.apache.hadoop.hive.ql.io.HiveInputFormat.HiveInputSplit`, I think it is 
right to `setConf` before `readFields`, because in this class, its `readFields` 
depends on `conf`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] JingsongLi commented on a change in pull request #10170: [FLINK-14722][hadoop] Optimize mapred.HadoopInputSplit to not serialize conf when split is not configurable

2019-11-21 Thread GitBox
JingsongLi commented on a change in pull request #10170: [FLINK-14722][hadoop] 
Optimize mapred.HadoopInputSplit to not serialize conf when split is not 
configurable
URL: https://github.com/apache/flink/pull/10170#discussion_r349458534
 
 

 ##
 File path: 
flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/hadoop/mapred/wrapper/HadoopInputSplit.java
 ##
 @@ -103,23 +109,28 @@ private void readObject(ObjectInputStream in) throws 
IOException, ClassNotFoundE
// read the parent fields and the final fields
in.defaultReadObject();
 
-   // the job conf knows how to deserialize itself
-   jobConf = new JobConf();
-   jobConf.readFields(in);
-
try {
hadoopInputSplit = 
(org.apache.hadoop.mapred.InputSplit) WritableFactories.newInstance(splitType);
-   }
-   catch (Exception e) {
+   } catch (Exception e) {
throw new RuntimeException("Unable to instantiate 
Hadoop InputSplit", e);
}
 
-   if (hadoopInputSplit instanceof Configurable) {
-   ((Configurable) hadoopInputSplit).setConf(this.jobConf);
-   }
-   else if (hadoopInputSplit instanceof JobConfigurable) {
-   ((JobConfigurable) 
hadoopInputSplit).configure(this.jobConf);
+   if (isConfigurable(hadoopInputSplit)) {
+   // the job conf knows how to deserialize itself
+   jobConf = new JobConf();
+   jobConf.readFields(in);
+
+   if (hadoopInputSplit instanceof Configurable) {
+   ((Configurable) 
hadoopInputSplit).setConf(this.jobConf);
 
 Review comment:
   The previous code is set conf first. But I think `setConf` after 
`readFields` is dangerous too.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] docete commented on a change in pull request #10268: [Flink-14599][table-planner-blink] Support precision of TimestampType in blink planner

2019-11-21 Thread GitBox
docete commented on a change in pull request #10268: 
[Flink-14599][table-planner-blink] Support precision of TimestampType in blink 
planner
URL: https://github.com/apache/flink/pull/10268#discussion_r349458280
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/CodeGenUtils.scala
 ##
 @@ -590,8 +597,7 @@ object CodeGenUtils {
 case DOUBLE => s"$arrayTerm.setNullDouble($index)"
 case TIME_WITHOUT_TIME_ZONE => s"$arrayTerm.setNullInt($index)"
 case DATE => s"$arrayTerm.setNullInt($index)"
-case TIMESTAMP_WITHOUT_TIME_ZONE |
- TIMESTAMP_WITH_LOCAL_TIME_ZONE => s"$arrayTerm.setNullLong($index)"
+case TIMESTAMP_WITH_LOCAL_TIME_ZONE => s"$arrayTerm.setNullLong($index)"
 
 Review comment:
   Why? IMP the behavior of setNull of TIMESTAMP should be the same as DECIMAL


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] wuchong commented on a change in pull request #10268: [Flink-14599][table-planner-blink] Support precision of TimestampType in blink planner

2019-11-21 Thread GitBox
wuchong commented on a change in pull request #10268: 
[Flink-14599][table-planner-blink] Support precision of TimestampType in blink 
planner
URL: https://github.com/apache/flink/pull/10268#discussion_r349458143
 
 

 ##
 File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/typeutils/LegacyLocalDateTimeTypeInfo.java
 ##
 @@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.typeutils;
+
+import org.apache.flink.api.common.typeinfo.LocalTimeTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.base.LocalDateTimeComparator;
+import org.apache.flink.api.common.typeutils.base.LocalDateTimeSerializer;
+
+import java.time.LocalDateTime;
+
+/**
+ * {@link TypeInformation} for {@link LocalDateTime}.
+ *
+ * The difference between Types.LOCAL_DATE_TIME is this TypeInformation 
holds a precision
+ * Reminder: Conversion from DateType to TypeInformation (and back) exists in
+ * TableSourceUtil.computeIndexMapping, which should be fixed after we remove 
Legacy TypeInformation
 
 Review comment:
   Could you create a JIRA issue to track this? 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] wuchong commented on a change in pull request #10268: [Flink-14599][table-planner-blink] Support precision of TimestampType in blink planner

2019-11-21 Thread GitBox
wuchong commented on a change in pull request #10268: 
[Flink-14599][table-planner-blink] Support precision of TimestampType in blink 
planner
URL: https://github.com/apache/flink/pull/10268#discussion_r349457521
 
 

 ##
 File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/match/RowtimeProcessFunction.java
 ##
 @@ -44,7 +44,7 @@ public RowtimeProcessFunction(int rowtimeIdx, 
TypeInformation returnTyp
 
@Override
public void processElement(BaseRow value, Context ctx, 
Collector out) throws Exception {
-   long timestamp = value.getLong(rowtimeIdx);
+   long timestamp = value.getTimestamp(rowtimeIdx, 
3).getMillisecond();
 
 Review comment:
   +1 we should pass in the precision too. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] JingsongLi commented on a change in pull request #10170: [FLINK-14722][hadoop] Optimize mapred.HadoopInputSplit to not serialize conf when split is not configurable

2019-11-21 Thread GitBox
JingsongLi commented on a change in pull request #10170: [FLINK-14722][hadoop] 
Optimize mapred.HadoopInputSplit to not serialize conf when split is not 
configurable
URL: https://github.com/apache/flink/pull/10170#discussion_r349457137
 
 

 ##
 File path: 
flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/hadoop/mapred/wrapper/HadoopInputSplit.java
 ##
 @@ -70,8 +77,8 @@ public HadoopInputSplit(int splitNumber, 
org.apache.hadoop.mapred.InputSplit hIn
public String[] getHostnames() {
try {
return this.hadoopInputSplit.getLocations();
-   }
-   catch (IOException e) {
+   } catch (IOException e) {
+   e.printStackTrace();
 
 Review comment:
   Sorry, it is just for debug. I was thinking about why to return 'new string 
[0]' here. Later, I found that this is a convention of `InputSplit`. They all 
do this. Maybe they think this is just an optimization.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] wuchong commented on a change in pull request #10268: [Flink-14599][table-planner-blink] Support precision of TimestampType in blink planner

2019-11-21 Thread GitBox
wuchong commented on a change in pull request #10268: 
[Flink-14599][table-planner-blink] Support precision of TimestampType in blink 
planner
URL: https://github.com/apache/flink/pull/10268#discussion_r349456919
 
 

 ##
 File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/types/TypeInfoDataTypeConverter.java
 ##
 @@ -98,6 +105,20 @@ private static void addDefaultTypeInfo(Class clazz, 
TypeInformation typeIn
}
LogicalType logicalType = fromDataTypeToLogicalType(dataType);
switch (logicalType.getTypeRoot()) {
+   case TIMESTAMP_WITHOUT_TIME_ZONE:
+   TimestampType timestampType = (TimestampType) 
logicalType;
+   int precision = timestampType.getPrecision();
+   if (timestampType.getKind() == 
TimestampKind.REGULAR) {
+   return clazz == SqlTimestamp.class ?
+   new 
SqlTimestampTypeInfo(precision) :
+   (clazz == LocalDateTime.class ?
+   ((3 == precision) ?
 
 Review comment:
   I think this is for backward compatibility, because `Types.LOCAL_DATE_TIME` 
is recognized as `TIMESTAMP(3)` in `LegacyTypeInfoDataTypeConverter`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhuzhurk commented on issue #10283: [FLINK-14735][scheduler] Improve scheduling of all-to-all partitions with ALL input constraint for legacy scheduler

2019-11-21 Thread GitBox
zhuzhurk commented on issue #10283: [FLINK-14735][scheduler] Improve scheduling 
of all-to-all partitions with ALL input constraint for legacy scheduler
URL: https://github.com/apache/flink/pull/10283#issuecomment-557418923
 
 
   @flinkbot run travis


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] docete commented on a change in pull request #10268: [Flink-14599][table-planner-blink] Support precision of TimestampType in blink planner

2019-11-21 Thread GitBox
docete commented on a change in pull request #10268: 
[Flink-14599][table-planner-blink] Support precision of TimestampType in blink 
planner
URL: https://github.com/apache/flink/pull/10268#discussion_r349456703
 
 

 ##
 File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/match/RowtimeProcessFunction.java
 ##
 @@ -44,7 +44,7 @@ public RowtimeProcessFunction(int rowtimeIdx, 
TypeInformation returnTyp
 
@Override
public void processElement(BaseRow value, Context ctx, 
Collector out) throws Exception {
-   long timestamp = value.getLong(rowtimeIdx);
+   long timestamp = value.getTimestamp(rowtimeIdx, 
3).getMillisecond();
 
 Review comment:
   I'm afraid it can not.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-14872) Potential deadlock for task reading from blocking ResultPartition.

2019-11-21 Thread Jingsong Lee (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14872?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16979921#comment-16979921
 ] 

Jingsong Lee commented on FLINK-14872:
--

Thanks [~kevin.cyj] and [~pnowojski], I provide some test background 
information, when we test TPC-DS with 10TB in batch mode and batch shuffle type:

It is very likely to bump into the deadlock situation. Try several times, and 
it is still possible to bump into it. Maybe the upstream data scale is too 
large, and it is very easy to fill all buffers (even if setting a large network 
buffer).

> Potential deadlock for task reading from blocking ResultPartition.
> --
>
> Key: FLINK-14872
> URL: https://issues.apache.org/jira/browse/FLINK-14872
> Project: Flink
>  Issue Type: Bug
>Reporter: Yingjie Cao
>Priority: Blocker
> Fix For: 1.10.0
>
>
> Currently, the buffer pool size of InputGate reading from blocking 
> ResultPartition is unbounded which have a potential of using too many buffers 
> and may lead to ResultPartition of the same task can not acquire enough core 
> buffers and finally lead to deadlock.
> Considers the following case:
> Core buffers are reserved for InputGate and ResultPartition -> InputGate 
> consumes lots of Buffer (not including the buffer reserved for 
> ResultPartition) -> Other tasks acquire exclusive buffer for InputGate and 
> trigger redistribute of Buffers (Buffers taken by previous InputGate can not 
> be released) -> The first task of which InputGate uses lots of buffers begin 
> to emit records but can not acquire enough core Buffers (Some operators may 
> not emit records out immediately or there is just nothing to emit) -> 
> Deadlock.
>  
> I think we can fix this problem by limit the number of Buffers can be 
> allocated by a InputGate which reads from blocking ResultPartition.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] KurtYoung commented on a change in pull request #10170: [FLINK-14722][hadoop] Optimize mapred.HadoopInputSplit to not serialize conf when split is not configurable

2019-11-21 Thread GitBox
KurtYoung commented on a change in pull request #10170: [FLINK-14722][hadoop] 
Optimize mapred.HadoopInputSplit to not serialize conf when split is not 
configurable
URL: https://github.com/apache/flink/pull/10170#discussion_r349454919
 
 

 ##
 File path: 
flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/hadoop/mapred/wrapper/HadoopInputSplit.java
 ##
 @@ -70,8 +77,8 @@ public HadoopInputSplit(int splitNumber, 
org.apache.hadoop.mapred.InputSplit hIn
public String[] getHostnames() {
try {
return this.hadoopInputSplit.getLocations();
-   }
-   catch (IOException e) {
+   } catch (IOException e) {
+   e.printStackTrace();
 
 Review comment:
   will this cause lots of messages?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] KurtYoung commented on a change in pull request #10170: [FLINK-14722][hadoop] Optimize mapred.HadoopInputSplit to not serialize conf when split is not configurable

2019-11-21 Thread GitBox
KurtYoung commented on a change in pull request #10170: [FLINK-14722][hadoop] 
Optimize mapred.HadoopInputSplit to not serialize conf when split is not 
configurable
URL: https://github.com/apache/flink/pull/10170#discussion_r349454800
 
 

 ##
 File path: 
flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/hadoop/mapred/wrapper/HadoopInputSplit.java
 ##
 @@ -103,23 +109,28 @@ private void readObject(ObjectInputStream in) throws 
IOException, ClassNotFoundE
// read the parent fields and the final fields
in.defaultReadObject();
 
-   // the job conf knows how to deserialize itself
-   jobConf = new JobConf();
-   jobConf.readFields(in);
-
try {
hadoopInputSplit = 
(org.apache.hadoop.mapred.InputSplit) WritableFactories.newInstance(splitType);
-   }
-   catch (Exception e) {
+   } catch (Exception e) {
throw new RuntimeException("Unable to instantiate 
Hadoop InputSplit", e);
}
 
-   if (hadoopInputSplit instanceof Configurable) {
-   ((Configurable) hadoopInputSplit).setConf(this.jobConf);
-   }
-   else if (hadoopInputSplit instanceof JobConfigurable) {
-   ((JobConfigurable) 
hadoopInputSplit).configure(this.jobConf);
+   if (isConfigurable(hadoopInputSplit)) {
+   // the job conf knows how to deserialize itself
+   jobConf = new JobConf();
+   jobConf.readFields(in);
+
+   if (hadoopInputSplit instanceof Configurable) {
+   ((Configurable) 
hadoopInputSplit).setConf(this.jobConf);
+   } else if (hadoopInputSplit instanceof JobConfigurable) 
{
+   ((JobConfigurable) 
hadoopInputSplit).configure(this.jobConf);
+   }
}
+
hadoopInputSplit.readFields(in);
}
+
+   private static boolean 
isConfigurable(org.apache.hadoop.mapred.InputSplit split) {
 
 Review comment:
   change method name to `needsJobConf` might be more clear


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] KurtYoung commented on a change in pull request #10170: [FLINK-14722][hadoop] Optimize mapred.HadoopInputSplit to not serialize conf when split is not configurable

2019-11-21 Thread GitBox
KurtYoung commented on a change in pull request #10170: [FLINK-14722][hadoop] 
Optimize mapred.HadoopInputSplit to not serialize conf when split is not 
configurable
URL: https://github.com/apache/flink/pull/10170#discussion_r349455337
 
 

 ##
 File path: 
flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/hadoop/mapred/wrapper/HadoopInputSplit.java
 ##
 @@ -103,23 +109,28 @@ private void readObject(ObjectInputStream in) throws 
IOException, ClassNotFoundE
// read the parent fields and the final fields
in.defaultReadObject();
 
-   // the job conf knows how to deserialize itself
-   jobConf = new JobConf();
-   jobConf.readFields(in);
-
try {
hadoopInputSplit = 
(org.apache.hadoop.mapred.InputSplit) WritableFactories.newInstance(splitType);
-   }
-   catch (Exception e) {
+   } catch (Exception e) {
throw new RuntimeException("Unable to instantiate 
Hadoop InputSplit", e);
}
 
-   if (hadoopInputSplit instanceof Configurable) {
-   ((Configurable) hadoopInputSplit).setConf(this.jobConf);
-   }
-   else if (hadoopInputSplit instanceof JobConfigurable) {
-   ((JobConfigurable) 
hadoopInputSplit).configure(this.jobConf);
+   if (isConfigurable(hadoopInputSplit)) {
+   // the job conf knows how to deserialize itself
+   jobConf = new JobConf();
+   jobConf.readFields(in);
+
+   if (hadoopInputSplit instanceof Configurable) {
+   ((Configurable) 
hadoopInputSplit).setConf(this.jobConf);
 
 Review comment:
   Is it ok to `conf` this `hadoopInputSplit` before 
`hadoopInputSplit.readFields(in)`? Looks dangerous to me. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #10284: [FLINK-14735][scheduler] Improve scheduling of all-to-all partitions with ALL input constraint for legacy scheduler

2019-11-21 Thread GitBox
flinkbot edited a comment on issue #10284: [FLINK-14735][scheduler] Improve 
scheduling of all-to-all partitions with ALL input constraint for legacy 
scheduler
URL: https://github.com/apache/flink/pull/10284#issuecomment-557162273
 
 
   
   ## CI report:
   
   * 55bb7f0c227441d894acdfdea48610e4ccdbf12c : PENDING 
[Build](https://travis-ci.com/flink-ci/flink/builds/137619209)
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] wuchong commented on a change in pull request #10268: [Flink-14599][table-planner-blink] Support precision of TimestampType in blink planner

2019-11-21 Thread GitBox
wuchong commented on a change in pull request #10268: 
[Flink-14599][table-planner-blink] Support precision of TimestampType in blink 
planner
URL: https://github.com/apache/flink/pull/10268#discussion_r349454718
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java
 ##
 @@ -644,10 +644,13 @@ public SqlMonotonicity 
getMonotonicity(SqlOperatorBinding call) {
OperandTypes.family(SqlTypeFamily.STRING, 
SqlTypeFamily.INTEGER),
SqlFunctionCategory.STRING);
 
+   // TODO: the return type of TO_TIMESTAMP should be TIMESTAMP(9)
 
 Review comment:
   Is this the same reason why we introduce `LegacyLocalDateTimeTypeInfo` and 
`LegacyTimestampTypeInfo` ?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] JingsongLi commented on a change in pull request #10268: [Flink-14599][table-planner-blink] Support precision of TimestampType in blink planner

2019-11-21 Thread GitBox
JingsongLi commented on a change in pull request #10268: 
[Flink-14599][table-planner-blink] Support precision of TimestampType in blink 
planner
URL: https://github.com/apache/flink/pull/10268#discussion_r349454379
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java
 ##
 @@ -644,10 +644,13 @@ public SqlMonotonicity 
getMonotonicity(SqlOperatorBinding call) {
OperandTypes.family(SqlTypeFamily.STRING, 
SqlTypeFamily.INTEGER),
SqlFunctionCategory.STRING);
 
+   // TODO: the return type of TO_TIMESTAMP should be TIMESTAMP(9)
 
 Review comment:
   Please don't forget add this JIRA comments to code too. Thanks.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-14902) JDBCTableSource support AsyncLookupFunction

2019-11-21 Thread Jingsong Lee (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14902?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16979912#comment-16979912
 ] 

Jingsong Lee commented on FLINK-14902:
--

Hi [~hailong wang], I am very glad you can have a try. As [~jark] said, JDBC 
doesn't provide async methods, so maybe you should do it by yourself by launch 
multi-thread, or I think maybe you can do some batch query to improve 
performance.

> JDBCTableSource support AsyncLookupFunction
> ---
>
> Key: FLINK-14902
> URL: https://issues.apache.org/jira/browse/FLINK-14902
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / JDBC
>Affects Versions: 1.9.0
>Reporter: hailong wang
>Assignee: hailong wang
>Priority: Major
> Fix For: 1.10.0
>
>
> JDBCTableSource support AsyncLookupFunction



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] openinx commented on a change in pull request #10270: [FLINK-14672][sql-client] Make Executor stateful in sql client

2019-11-21 Thread GitBox
openinx commented on a change in pull request #10270: [FLINK-14672][sql-client] 
Make Executor stateful in sql client
URL: https://github.com/apache/flink/pull/10270#discussion_r349451095
 
 

 ##
 File path: 
flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
 ##
 @@ -188,10 +199,76 @@ public void start() {
// nothing to do yet
}
 
+   /**
+* Create a new {@link ExecutionContext} by merging the default 
environment the the environment in session context.
+*/
+   private ExecutionContext createExecutionContext(SessionContext 
sessionContext) {
+   return createExecutionContext(defaultEnvironment, 
sessionContext.getSessionEnv());
+   }
+
+   /**
+* Create a new {@link ExecutionContext} by merging the default 
environment and session environment.
+*/
+   private ExecutionContext createExecutionContext(Environment 
defaultEnv, Environment sessionEnv) {
+   Environment mergedEnv = Environment.merge(defaultEnv, 
sessionEnv);
+   return createExecutionContext(mergedEnv);
+   }
+
+   /**
+* Create a new {@link ExecutionContext} by using the given environment.
+*/
+   private ExecutionContext createExecutionContext(Environment 
environment) {
+   try {
+   return new ExecutionContext<>(
+   environment,
+   dependencies,
+   flinkConfig,
+   clusterClientServiceLoader,
+   commandLineOptions,
+   commandLines);
+   } catch (Throwable t) {
+   // catch everything such that a configuration does not 
crash the executor
+   throw new SqlExecutionException("Could not create 
execution context.", t);
+   }
+   }
+
@Override
-   public Map getSessionProperties(SessionContext session) 
throws SqlExecutionException {
-   final Environment env = getOrCreateExecutionContext(session)
-   .getMergedEnvironment();
+   public String openSession(SessionContext sessionContext) throws 
SqlExecutionException {
+   String sessionId = sessionContext.getSessionId();
+   ExecutionContext previousContext = 
this.contextMap.putIfAbsent(sessionId, createExecutionContext(sessionContext));
+   if (previousContext != null) {
+   throw new SqlExecutionException("Found another session 
with the same session identifier: " + sessionContext.getSessionId());
 
 Review comment:
   OK


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] openinx commented on a change in pull request #10270: [FLINK-14672][sql-client] Make Executor stateful in sql client

2019-11-21 Thread GitBox
openinx commented on a change in pull request #10270: [FLINK-14672][sql-client] 
Make Executor stateful in sql client
URL: https://github.com/apache/flink/pull/10270#discussion_r349451137
 
 

 ##
 File path: 
flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
 ##
 @@ -188,10 +199,76 @@ public void start() {
// nothing to do yet
}
 
+   /**
+* Create a new {@link ExecutionContext} by merging the default 
environment the the environment in session context.
+*/
+   private ExecutionContext createExecutionContext(SessionContext 
sessionContext) {
+   return createExecutionContext(defaultEnvironment, 
sessionContext.getSessionEnv());
+   }
+
+   /**
+* Create a new {@link ExecutionContext} by merging the default 
environment and session environment.
+*/
+   private ExecutionContext createExecutionContext(Environment 
defaultEnv, Environment sessionEnv) {
+   Environment mergedEnv = Environment.merge(defaultEnv, 
sessionEnv);
+   return createExecutionContext(mergedEnv);
+   }
+
+   /**
+* Create a new {@link ExecutionContext} by using the given environment.
+*/
+   private ExecutionContext createExecutionContext(Environment 
environment) {
+   try {
+   return new ExecutionContext<>(
+   environment,
+   dependencies,
+   flinkConfig,
+   clusterClientServiceLoader,
+   commandLineOptions,
+   commandLines);
+   } catch (Throwable t) {
+   // catch everything such that a configuration does not 
crash the executor
+   throw new SqlExecutionException("Could not create 
execution context.", t);
+   }
+   }
+
@Override
-   public Map getSessionProperties(SessionContext session) 
throws SqlExecutionException {
-   final Environment env = getOrCreateExecutionContext(session)
-   .getMergedEnvironment();
+   public String openSession(SessionContext sessionContext) throws 
SqlExecutionException {
+   String sessionId = sessionContext.getSessionId();
+   ExecutionContext previousContext = 
this.contextMap.putIfAbsent(sessionId, createExecutionContext(sessionContext));
+   if (previousContext != null) {
+   throw new SqlExecutionException("Found another session 
with the same session identifier: " + sessionContext.getSessionId());
+   }
+   return sessionContext.getSessionId();
 
 Review comment:
   OK.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] openinx commented on a change in pull request #10270: [FLINK-14672][sql-client] Make Executor stateful in sql client

2019-11-21 Thread GitBox
openinx commented on a change in pull request #10270: [FLINK-14672][sql-client] 
Make Executor stateful in sql client
URL: https://github.com/apache/flink/pull/10270#discussion_r349453119
 
 

 ##
 File path: 
flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
 ##
 @@ -347,14 +441,16 @@ public String explainStatement(SessionContext session, 
String statement) throws
}
 
@Override
-   public ResultDescriptor executeQuery(SessionContext session, String 
query) throws SqlExecutionException {
-   final ExecutionContext context = 
getOrCreateExecutionContext(session);
-   return executeQueryInternal(context, query);
+   public ResultDescriptor executeQuery(String sessionId, String query) 
throws SqlExecutionException {
+   final ExecutionContext context = 
getExecutionContext(sessionId);
+   return executeQueryInternal(sessionId, context, query);
}
 
@Override
-   public TypedResult>> 
retrieveResultChanges(SessionContext session,
-   String resultId) throws SqlExecutionException {
+   public TypedResult>> retrieveResultChanges(
+   String sessionId,
+   String resultId
+   ) throws SqlExecutionException {
 
 Review comment:
   OK.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] openinx commented on a change in pull request #10270: [FLINK-14672][sql-client] Make Executor stateful in sql client

2019-11-21 Thread GitBox
openinx commented on a change in pull request #10270: [FLINK-14672][sql-client] 
Make Executor stateful in sql client
URL: https://github.com/apache/flink/pull/10270#discussion_r349452604
 
 

 ##
 File path: 
flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
 ##
 @@ -200,69 +277,98 @@ public void start() {
}
 
@Override
-   public List listCatalogs(SessionContext session) throws 
SqlExecutionException {
-   final ExecutionContext context = 
getOrCreateExecutionContext(session);
+   public void resetSessionProperties(String sessionId) throws 
SqlExecutionException {
+   // Renew the ExecutionContext by using the default environment.
+   this.contextMap.put(sessionId, 
createExecutionContext(defaultEnvironment));
 
 Review comment:
   Yeah,  I noticed here.  I planed to not saving the session environment file 
in ExecutionContext before, so here seems no method to reset back to the 
session env.   Let me re-think about the  design , maybe we still need the 
session env..  


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] openinx commented on a change in pull request #10270: [FLINK-14672][sql-client] Make Executor stateful in sql client

2019-11-21 Thread GitBox
openinx commented on a change in pull request #10270: [FLINK-14672][sql-client] 
Make Executor stateful in sql client
URL: https://github.com/apache/flink/pull/10270#discussion_r349452869
 
 

 ##
 File path: 
flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
 ##
 @@ -200,69 +277,98 @@ public void start() {
}
 
@Override
-   public List listCatalogs(SessionContext session) throws 
SqlExecutionException {
-   final ExecutionContext context = 
getOrCreateExecutionContext(session);
+   public void resetSessionProperties(String sessionId) throws 
SqlExecutionException {
+   // Renew the ExecutionContext by using the default environment.
+   this.contextMap.put(sessionId, 
createExecutionContext(defaultEnvironment));
+   }
 
-   final TableEnvironment tableEnv = context
-   .createEnvironmentInstance()
-   .getTableEnvironment();
+   @Override
+   public void setSessionProperty(String sessionId, String key, String 
value) throws SqlExecutionException {
+   Environment env = 
getExecutionContext(sessionId).getEnvironment();
+   Environment newEnv = Environment.enrich(env, 
ImmutableMap.of(key, value), ImmutableMap.of());
+   // Renew the ExecutionContext by merging the default 
environment and new environment.
+   this.contextMap.put(sessionId, 
createExecutionContext(defaultEnvironment, newEnv));
+   }
 
-   return context.wrapClassLoader(() -> 
Arrays.asList(tableEnv.listCatalogs()));
+   @Override
+   public void addView(String sessionId, String name, String query) throws 
SqlExecutionException {
+   Environment env = 
getExecutionContext(sessionId).getEnvironment();
+   Environment newEnv = Environment.enrich(
+   env,
+   ImmutableMap.of(),
+   ImmutableMap.of(name, ViewEntry.create(name, query)));
+   // Renew the ExecutionContext.
+   this.contextMap.put(sessionId, 
createExecutionContext(defaultEnvironment, newEnv));
 
 Review comment:
   Will also change this.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] openinx commented on a change in pull request #10270: [FLINK-14672][sql-client] Make Executor stateful in sql client

2019-11-21 Thread GitBox
openinx commented on a change in pull request #10270: [FLINK-14672][sql-client] 
Make Executor stateful in sql client
URL: https://github.com/apache/flink/pull/10270#discussion_r349450962
 
 

 ##
 File path: 
flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
 ##
 @@ -171,12 +175,19 @@ public LocalExecutor(URL defaultEnv, List jars, 
List libraries) {
/**
 * Constructor for testing purposes.
 */
-   public LocalExecutor(Environment defaultEnvironment, List 
dependencies, Configuration flinkConfig, CustomCommandLine commandLine, 
ClusterClientServiceLoader clusterClientServiceLoader) {
+   public LocalExecutor(
+   Environment defaultEnvironment,
+   List dependencies,
+   Configuration flinkConfig,
+   CustomCommandLine commandLine,
+   ClusterClientServiceLoader clusterClientServiceLoader
+   ) {
 
 Review comment:
   OK


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] openinx commented on a change in pull request #10270: [FLINK-14672][sql-client] Make Executor stateful in sql client

2019-11-21 Thread GitBox
openinx commented on a change in pull request #10270: [FLINK-14672][sql-client] 
Make Executor stateful in sql client
URL: https://github.com/apache/flink/pull/10270#discussion_r349448931
 
 

 ##
 File path: 
flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
 ##
 @@ -111,71 +113,56 @@
  */
 public class ExecutionContext {
 
-   private final SessionContext sessionContext;
-   private final Environment mergedEnv;
+   private final Environment environment;
private final ClassLoader classLoader;
-   private final Map modules;
-   private final Map catalogs;
-   private final Map> tableSources;
-   private final Map> tableSinks;
-   private final Map functions;
+
private final Configuration flinkConfig;
private final Configuration executorConfig;
private final ClusterClientFactory clusterClientFactory;
private final ExecutionConfigAccessor executionParameters;
private final ClusterID clusterId;
private final ClusterSpecification clusterSpec;
 
-   public ExecutionContext(Environment defaultEnvironment, SessionContext 
sessionContext, List dependencies,
-   Configuration flinkConfig, Options 
commandLineOptions, List availableCommandLines) throws 
FlinkException {
-   this(defaultEnvironment, sessionContext, dependencies, 
flinkConfig, new DefaultClusterClientServiceLoader(), commandLineOptions, 
availableCommandLines);
+   private TableEnvironment tableEnv;
+   private ExecutionEnvironment execEnv;
+   private StreamExecutionEnvironment streamExecEnv;
+   private Executor executor;
+
+   public ExecutionContext(
+   Environment environment,
 
 Review comment:
   I've set the checksytle in my ide, seems the Selection-format makes the 
indent happen again. Let me check.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] openinx commented on a change in pull request #10270: [FLINK-14672][sql-client] Make Executor stateful in sql client

2019-11-21 Thread GitBox
openinx commented on a change in pull request #10270: [FLINK-14672][sql-client] 
Make Executor stateful in sql client
URL: https://github.com/apache/flink/pull/10270#discussion_r349452821
 
 

 ##
 File path: 
flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
 ##
 @@ -200,69 +277,98 @@ public void start() {
}
 
@Override
-   public List listCatalogs(SessionContext session) throws 
SqlExecutionException {
-   final ExecutionContext context = 
getOrCreateExecutionContext(session);
+   public void resetSessionProperties(String sessionId) throws 
SqlExecutionException {
+   // Renew the ExecutionContext by using the default environment.
+   this.contextMap.put(sessionId, 
createExecutionContext(defaultEnvironment));
+   }
 
-   final TableEnvironment tableEnv = context
-   .createEnvironmentInstance()
-   .getTableEnvironment();
+   @Override
+   public void setSessionProperty(String sessionId, String key, String 
value) throws SqlExecutionException {
+   Environment env = 
getExecutionContext(sessionId).getEnvironment();
+   Environment newEnv = Environment.enrich(env, 
ImmutableMap.of(key, value), ImmutableMap.of());
+   // Renew the ExecutionContext by merging the default 
environment and new environment.
+   this.contextMap.put(sessionId, 
createExecutionContext(defaultEnvironment, newEnv));
 
 Review comment:
   Sounds good, thanks for pointing it out.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] openinx commented on a change in pull request #10270: [FLINK-14672][sql-client] Make Executor stateful in sql client

2019-11-21 Thread GitBox
openinx commented on a change in pull request #10270: [FLINK-14672][sql-client] 
Make Executor stateful in sql client
URL: https://github.com/apache/flink/pull/10270#discussion_r349448946
 
 

 ##
 File path: 
flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
 ##
 @@ -111,71 +113,56 @@
  */
 public class ExecutionContext {
 
-   private final SessionContext sessionContext;
-   private final Environment mergedEnv;
+   private final Environment environment;
private final ClassLoader classLoader;
-   private final Map modules;
-   private final Map catalogs;
-   private final Map> tableSources;
-   private final Map> tableSinks;
-   private final Map functions;
+
private final Configuration flinkConfig;
private final Configuration executorConfig;
private final ClusterClientFactory clusterClientFactory;
private final ExecutionConfigAccessor executionParameters;
private final ClusterID clusterId;
private final ClusterSpecification clusterSpec;
 
-   public ExecutionContext(Environment defaultEnvironment, SessionContext 
sessionContext, List dependencies,
-   Configuration flinkConfig, Options 
commandLineOptions, List availableCommandLines) throws 
FlinkException {
-   this(defaultEnvironment, sessionContext, dependencies, 
flinkConfig, new DefaultClusterClientServiceLoader(), commandLineOptions, 
availableCommandLines);
+   private TableEnvironment tableEnv;
+   private ExecutionEnvironment execEnv;
+   private StreamExecutionEnvironment streamExecEnv;
+   private Executor executor;
+
+   public ExecutionContext(
+   Environment environment,
+   List dependencies,
+   Configuration flinkConfig,
+   Options commandLineOptions,
+   List availableCommandLines) throws 
FlinkException {
+   this(environment,
 
 Review comment:
   OK.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] KurtYoung commented on a change in pull request #10271: [FLINK-14874] [table-planner-blink] add local aggregate to solve data skew for ROLLUP/CUBE case

2019-11-21 Thread GitBox
KurtYoung commented on a change in pull request #10271: [FLINK-14874] 
[table-planner-blink] add local aggregate to solve data skew for ROLLUP/CUBE 
case
URL: https://github.com/apache/flink/pull/10271#discussion_r349452471
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/EnforceLocalAggRuleBase.scala
 ##
 @@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.physical.batch
+
+import org.apache.flink.table.api.TableException
+import org.apache.flink.table.planner.calcite.FlinkTypeFactory
+import org.apache.flink.table.planner.plan.`trait`.FlinkRelDistribution
+import org.apache.flink.table.planner.plan.nodes.FlinkConventions
+import 
org.apache.flink.table.planner.plan.nodes.physical.batch.{BatchExecExchange, 
BatchExecExpand, BatchExecGroupAggregateBase, BatchExecHashAggregate, 
BatchExecLocalHashAggregate, BatchExecLocalSortAggregate, 
BatchExecSortAggregate}
+import org.apache.flink.table.planner.plan.utils.{AggregateUtil, 
FlinkRelOptUtil}
+import 
org.apache.flink.table.runtime.types.LogicalTypeDataTypeConverter.fromDataTypeToLogicalType
+
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelOptRuleOperand}
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rex.RexUtil
+import org.apache.calcite.tools.RelBuilder
+import org.apache.calcite.util.Util
+
+import scala.collection.JavaConversions._
+
+/**
+  * Planner rule that writes one phase aggregate to two phase aggregate,
+  * when the following conditions are met:
+  * 1. there is no local aggregate,
+  * 2. the aggregate has non-empty grouping and two phase aggregate strategy 
is enabled,
+  * 3. the input is [[BatchExecExpand]] and there is at least one expand row
+  * which the columns for grouping are all constant.
+  */
+abstract class EnforceLocalAggRuleBase(
+operand: RelOptRuleOperand,
+description: String)
+  extends RelOptRule(operand, description)
+  with BatchExecAggRuleBase {
+
+  protected def getBatchExecExpand(call: RelOptRuleCall): BatchExecExpand
+
+  override def matches(call: RelOptRuleCall): Boolean = {
+val agg: BatchExecGroupAggregateBase = call.rel(0)
+val expand = getBatchExecExpand(call)
+
+val tableConfig = FlinkRelOptUtil.getTableConfigFromContext(agg)
+val aggFunctions = agg.getAggCallToAggFunction.map(_._2).toArray
+val enableTwoPhaseAgg = isTwoPhaseAggWorkable(aggFunctions, tableConfig)
+
+val grouping = agg.getGrouping
+// if all group columns in a expand row are constant, this row will be 
shuffled to
+// a single node. (shuffle keys are grouping)
+// add local aggregate to greatly reduce the output data
+val hasConstantRow = expand.projects.exists {
+  project =>
+val groupingColumns = grouping.map(i => project.get(i))
+groupingColumns.forall(RexUtil.isConstant)
+}
+
+grouping.nonEmpty && enableTwoPhaseAgg && hasConstantRow
+  }
+
+  protected def createLocalAgg(
 
 Review comment:
   try to reuse this logic of creating local aggregate with two phase aggregate?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] KurtYoung commented on a change in pull request #10271: [FLINK-14874] [table-planner-blink] add local aggregate to solve data skew for ROLLUP/CUBE case

2019-11-21 Thread GitBox
KurtYoung commented on a change in pull request #10271: [FLINK-14874] 
[table-planner-blink] add local aggregate to solve data skew for ROLLUP/CUBE 
case
URL: https://github.com/apache/flink/pull/10271#discussion_r349451853
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/EnforceLocalAggRuleBase.scala
 ##
 @@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.physical.batch
+
+import org.apache.flink.table.api.TableException
+import org.apache.flink.table.planner.calcite.FlinkTypeFactory
+import org.apache.flink.table.planner.plan.`trait`.FlinkRelDistribution
+import org.apache.flink.table.planner.plan.nodes.FlinkConventions
+import 
org.apache.flink.table.planner.plan.nodes.physical.batch.{BatchExecExchange, 
BatchExecExpand, BatchExecGroupAggregateBase, BatchExecHashAggregate, 
BatchExecLocalHashAggregate, BatchExecLocalSortAggregate, 
BatchExecSortAggregate}
+import org.apache.flink.table.planner.plan.utils.{AggregateUtil, 
FlinkRelOptUtil}
+import 
org.apache.flink.table.runtime.types.LogicalTypeDataTypeConverter.fromDataTypeToLogicalType
+
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelOptRuleOperand}
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rex.RexUtil
+import org.apache.calcite.tools.RelBuilder
+import org.apache.calcite.util.Util
+
+import scala.collection.JavaConversions._
+
+/**
+  * Planner rule that writes one phase aggregate to two phase aggregate,
+  * when the following conditions are met:
+  * 1. there is no local aggregate,
+  * 2. the aggregate has non-empty grouping and two phase aggregate strategy 
is enabled,
+  * 3. the input is [[BatchExecExpand]] and there is at least one expand row
+  * which the columns for grouping are all constant.
+  */
+abstract class EnforceLocalAggRuleBase(
+operand: RelOptRuleOperand,
+description: String)
+  extends RelOptRule(operand, description)
+  with BatchExecAggRuleBase {
+
+  protected def getBatchExecExpand(call: RelOptRuleCall): BatchExecExpand
+
+  override def matches(call: RelOptRuleCall): Boolean = {
+val agg: BatchExecGroupAggregateBase = call.rel(0)
+val expand = getBatchExecExpand(call)
+
+val tableConfig = FlinkRelOptUtil.getTableConfigFromContext(agg)
+val aggFunctions = agg.getAggCallToAggFunction.map(_._2).toArray
+val enableTwoPhaseAgg = isTwoPhaseAggWorkable(aggFunctions, tableConfig)
+
+val grouping = agg.getGrouping
+// if all group columns in a expand row are constant, this row will be 
shuffled to
+// a single node. (shuffle keys are grouping)
+// add local aggregate to greatly reduce the output data
+val hasConstantRow = expand.projects.exists {
+  project =>
+val groupingColumns = grouping.map(i => project.get(i))
+groupingColumns.forall(RexUtil.isConstant)
+}
+
+grouping.nonEmpty && enableTwoPhaseAgg && hasConstantRow
 
 Review comment:
   if the group is empty, local aggregation seems to be also effective?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] KurtYoung commented on a change in pull request #10271: [FLINK-14874] [table-planner-blink] add local aggregate to solve data skew for ROLLUP/CUBE case

2019-11-21 Thread GitBox
KurtYoung commented on a change in pull request #10271: [FLINK-14874] 
[table-planner-blink] add local aggregate to solve data skew for ROLLUP/CUBE 
case
URL: https://github.com/apache/flink/pull/10271#discussion_r349452562
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/EnforceLocalAggRuleBase.scala
 ##
 @@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.physical.batch
+
+import org.apache.flink.table.api.TableException
+import org.apache.flink.table.planner.calcite.FlinkTypeFactory
+import org.apache.flink.table.planner.plan.`trait`.FlinkRelDistribution
+import org.apache.flink.table.planner.plan.nodes.FlinkConventions
+import 
org.apache.flink.table.planner.plan.nodes.physical.batch.{BatchExecExchange, 
BatchExecExpand, BatchExecGroupAggregateBase, BatchExecHashAggregate, 
BatchExecLocalHashAggregate, BatchExecLocalSortAggregate, 
BatchExecSortAggregate}
+import org.apache.flink.table.planner.plan.utils.{AggregateUtil, 
FlinkRelOptUtil}
+import 
org.apache.flink.table.runtime.types.LogicalTypeDataTypeConverter.fromDataTypeToLogicalType
+
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelOptRuleOperand}
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rex.RexUtil
+import org.apache.calcite.tools.RelBuilder
+import org.apache.calcite.util.Util
+
+import scala.collection.JavaConversions._
+
+/**
+  * Planner rule that writes one phase aggregate to two phase aggregate,
+  * when the following conditions are met:
+  * 1. there is no local aggregate,
+  * 2. the aggregate has non-empty grouping and two phase aggregate strategy 
is enabled,
+  * 3. the input is [[BatchExecExpand]] and there is at least one expand row
+  * which the columns for grouping are all constant.
+  */
+abstract class EnforceLocalAggRuleBase(
+operand: RelOptRuleOperand,
+description: String)
+  extends RelOptRule(operand, description)
+  with BatchExecAggRuleBase {
+
+  protected def getBatchExecExpand(call: RelOptRuleCall): BatchExecExpand
+
+  override def matches(call: RelOptRuleCall): Boolean = {
+val agg: BatchExecGroupAggregateBase = call.rel(0)
+val expand = getBatchExecExpand(call)
+
+val tableConfig = FlinkRelOptUtil.getTableConfigFromContext(agg)
+val aggFunctions = agg.getAggCallToAggFunction.map(_._2).toArray
+val enableTwoPhaseAgg = isTwoPhaseAggWorkable(aggFunctions, tableConfig)
+
+val grouping = agg.getGrouping
+// if all group columns in a expand row are constant, this row will be 
shuffled to
+// a single node. (shuffle keys are grouping)
+// add local aggregate to greatly reduce the output data
+val hasConstantRow = expand.projects.exists {
+  project =>
+val groupingColumns = grouping.map(i => project.get(i))
+groupingColumns.forall(RexUtil.isConstant)
+}
+
+grouping.nonEmpty && enableTwoPhaseAgg && hasConstantRow
+  }
+
+  protected def createLocalAgg(
+  completeAgg: BatchExecGroupAggregateBase,
+  input: RelNode,
+  relBuilder: RelBuilder): BatchExecGroupAggregateBase = {
+val cluster = completeAgg.getCluster
+val inputRowType = input.getRowType
+
+val grouping = completeAgg.getGrouping
+val auxGrouping = completeAgg.getAuxGrouping
+val aggCalls = completeAgg.getAggCallList
+val aggCallToAggFunction = completeAgg.getAggCallToAggFunction
+
+val (_, aggBufferTypes, aggFunctions) = 
AggregateUtil.transformToBatchAggregateFunctions(
+  aggCalls, inputRowType)
+
+val typeFactory = 
completeAgg.getCluster.getTypeFactory.asInstanceOf[FlinkTypeFactory]
+val aggCallNames = Util.skip(
+  completeAgg.getRowType.getFieldNames, grouping.length + 
auxGrouping.length).toList.toArray
+
+val localAggRowType = inferLocalAggType(
+  inputRowType,
+  typeFactory,
+  aggCallNames,
+  grouping,
+  auxGrouping,
+  aggFunctions,
+  aggBufferTypes.map(_.map(fromDataTypeToLogicalType)))
+
+val traitSet = cluster.getPlanner
+  .emptyTraitSet
+  .replace(FlinkConventions.BATCH_PHYSICAL)
+
+completeAgg match 

[GitHub] [flink] KurtYoung commented on a change in pull request #10271: [FLINK-14874] [table-planner-blink] add local aggregate to solve data skew for ROLLUP/CUBE case

2019-11-21 Thread GitBox
KurtYoung commented on a change in pull request #10271: [FLINK-14874] 
[table-planner-blink] add local aggregate to solve data skew for ROLLUP/CUBE 
case
URL: https://github.com/apache/flink/pull/10271#discussion_r349451475
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/EnforceLocalAggRuleBase.scala
 ##
 @@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.physical.batch
+
+import org.apache.flink.table.api.TableException
+import org.apache.flink.table.planner.calcite.FlinkTypeFactory
+import org.apache.flink.table.planner.plan.`trait`.FlinkRelDistribution
+import org.apache.flink.table.planner.plan.nodes.FlinkConventions
+import 
org.apache.flink.table.planner.plan.nodes.physical.batch.{BatchExecExchange, 
BatchExecExpand, BatchExecGroupAggregateBase, BatchExecHashAggregate, 
BatchExecLocalHashAggregate, BatchExecLocalSortAggregate, 
BatchExecSortAggregate}
+import org.apache.flink.table.planner.plan.utils.{AggregateUtil, 
FlinkRelOptUtil}
+import 
org.apache.flink.table.runtime.types.LogicalTypeDataTypeConverter.fromDataTypeToLogicalType
+
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelOptRuleOperand}
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rex.RexUtil
+import org.apache.calcite.tools.RelBuilder
+import org.apache.calcite.util.Util
+
+import scala.collection.JavaConversions._
+
+/**
+  * Planner rule that writes one phase aggregate to two phase aggregate,
+  * when the following conditions are met:
+  * 1. there is no local aggregate,
+  * 2. the aggregate has non-empty grouping and two phase aggregate strategy 
is enabled,
+  * 3. the input is [[BatchExecExpand]] and there is at least one expand row
+  * which the columns for grouping are all constant.
+  */
+abstract class EnforceLocalAggRuleBase(
+operand: RelOptRuleOperand,
+description: String)
+  extends RelOptRule(operand, description)
+  with BatchExecAggRuleBase {
+
+  protected def getBatchExecExpand(call: RelOptRuleCall): BatchExecExpand
+
+  override def matches(call: RelOptRuleCall): Boolean = {
+val agg: BatchExecGroupAggregateBase = call.rel(0)
 
 Review comment:
   It's better to leave the `matches` method to the child classes. It's hard to 
tell what will be for `call.rel(0)` is here since we don't know the pattern in 
base class. You can find some other ways to reuse some codes though. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] JingsongLi commented on a change in pull request #10268: [Flink-14599][table-planner-blink] Support precision of TimestampType in blink planner

2019-11-21 Thread GitBox
JingsongLi commented on a change in pull request #10268: 
[Flink-14599][table-planner-blink] Support precision of TimestampType in blink 
planner
URL: https://github.com/apache/flink/pull/10268#discussion_r349450409
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/calls/MethodCallGen.scala
 ##
 @@ -34,13 +33,14 @@ class MethodCallGen(method: Method) extends CallGenerator {
   returnType: LogicalType): GeneratedExpression = {
 generateCallIfArgsNotNull(ctx, returnType, operands, 
!method.getReturnType.isPrimitive) {
   originalTerms => {
-val terms = originalTerms.zip(method.getParameterTypes).map { case 
(term, clazz) =>
-  // convert the BinaryString parameter to String if the method 
parameter accept String
-  if (clazz == classOf[String]) {
-s"$term.toString()"
-  } else {
-term
-  }
+val terms = 
originalTerms.zipWithIndex.zip(method.getParameterTypes).map {
 
 Review comment:
   What do you want to change?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] JingsongLi commented on a change in pull request #10268: [Flink-14599][table-planner-blink] Support precision of TimestampType in blink planner

2019-11-21 Thread GitBox
JingsongLi commented on a change in pull request #10268: 
[Flink-14599][table-planner-blink] Support precision of TimestampType in blink 
planner
URL: https://github.com/apache/flink/pull/10268#discussion_r349451016
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/calls/ScalarOperatorGens.scala
 ##
 @@ -191,12 +191,24 @@ object ScalarOperatorGens {
 
   case (TIMESTAMP_WITHOUT_TIME_ZONE, INTERVAL_DAY_TIME) =>
 generateOperatorIfNotNull(ctx, left.resultType, left, right) {
-  (l, r) => s"$l $op $r"
+  (l, r) => {
+val leftTerm = s"$l.getMillisecond()"
+val nanoTerm = s"$l.getNanoOfMillisecond()"
+s"$SQL_TIMESTAMP.fromEpochMillis($leftTerm $op $r, $nanoTerm)"
 
 Review comment:
   Do you think we can add some util to `SqlTimestamp`? like `plus(mills)` and 
etc.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] JingsongLi commented on a change in pull request #10268: [Flink-14599][table-planner-blink] Support precision of TimestampType in blink planner

2019-11-21 Thread GitBox
JingsongLi commented on a change in pull request #10268: 
[Flink-14599][table-planner-blink] Support precision of TimestampType in blink 
planner
URL: https://github.com/apache/flink/pull/10268#discussion_r349451884
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/calls/ScalarOperatorGens.scala
 ##
 @@ -783,22 +826,35 @@ object ScalarOperatorGens {
   operand.resultType.asInstanceOf[TimestampType].getKind == 
TimestampKind.ROWTIME ||
   targetType.asInstanceOf[TimestampType].getKind == 
TimestampKind.PROCTIME ||
   targetType.asInstanceOf[TimestampType].getKind == 
TimestampKind.ROWTIME =>
-  operand.copy(resultType = new TimestampType(3)) // just replace the 
DataType
+operand.copy(resultType = new TimestampType(3)) // just replace the 
DataType
+
+case (TIMESTAMP_WITHOUT_TIME_ZONE, TIMESTAMP_WITHOUT_TIME_ZONE) =>
+  val fromType = operand.resultType.asInstanceOf[TimestampType]
+  val toType = targetType.asInstanceOf[TimestampType]
+  if (fromType.getPrecision <= toType.getPrecision) {
+operand
 
 Review comment:
   `operand.copy(targetType)`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #10235: [FLINK-14839][config] Let JobGraph#classpaths become non-null

2019-11-21 Thread GitBox
flinkbot edited a comment on issue #10235: [FLINK-14839][config] Let 
JobGraph#classpaths become non-null
URL: https://github.com/apache/flink/pull/10235#issuecomment-554960500
 
 
   
   ## CI report:
   
   * 15f1c1aa065c1135098b8cec5a52200c82675243 : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/136972297)
   * 30ef6bcb50459ea4efcd4d9c4091436ad448d6a8 : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/137310642)
   * 7c0af1f6f2ca9504528d7d3f458449a100faf184 : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/137702770)
   * 71f07dc72c9590bef027993dc258e598137514a7 : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/137709302)
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhuzhurk commented on issue #10284: [FLINK-14735][scheduler] Improve scheduling of all-to-all partitions with ALL input constraint for legacy scheduler

2019-11-21 Thread GitBox
zhuzhurk commented on issue #10284: [FLINK-14735][scheduler] Improve scheduling 
of all-to-all partitions with ALL input constraint for legacy scheduler
URL: https://github.com/apache/flink/pull/10284#issuecomment-557411317
 
 
   @flinkbot run travis


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Closed] (FLINK-14886) Wrong result in scalar query using blink planner

2019-11-21 Thread Kurt Young (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-14886?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kurt Young closed FLINK-14886.
--
Fix Version/s: 1.10.0
 Assignee: Leonard Xu
   Resolution: Duplicate

The reason is FLINK-14924

> Wrong result in scalar query using blink planner
> 
>
> Key: FLINK-14886
> URL: https://issues.apache.org/jira/browse/FLINK-14886
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.9.1
>Reporter: Leonard Xu
>Assignee: Leonard Xu
>Priority: Blocker
> Fix For: 1.10.0
>
> Attachments: Flink.png, screenshot-1.png, screenshot-2.png
>
>
> Sorry for close this issue becasue I didn't  find the precise root cause. 
> But I have some conclusion as following, and I’ll open a new issue if I have 
> positive progress. 
> !screenshot-2.png!  
> !Flink.png! 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-14902) JDBCTableSource support AsyncLookupFunction

2019-11-21 Thread Jark Wu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14902?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16979902#comment-16979902
 ] 

Jark Wu commented on FLINK-14902:
-

Sure [~hailong wang]. Could you share a bit about how do you plan to support 
async lookup? Because JDBC doesn't provide async methods. 

> JDBCTableSource support AsyncLookupFunction
> ---
>
> Key: FLINK-14902
> URL: https://issues.apache.org/jira/browse/FLINK-14902
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / JDBC
>Affects Versions: 1.9.0
>Reporter: hailong wang
>Assignee: hailong wang
>Priority: Major
> Fix For: 1.10.0
>
>
> JDBCTableSource support AsyncLookupFunction



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] docete commented on a change in pull request #10268: [Flink-14599][table-planner-blink] Support precision of TimestampType in blink planner

2019-11-21 Thread GitBox
docete commented on a change in pull request #10268: 
[Flink-14599][table-planner-blink] Support precision of TimestampType in blink 
planner
URL: https://github.com/apache/flink/pull/10268#discussion_r349450401
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java
 ##
 @@ -644,10 +644,13 @@ public SqlMonotonicity 
getMonotonicity(SqlOperatorBinding call) {
OperandTypes.family(SqlTypeFamily.STRING, 
SqlTypeFamily.INTEGER),
SqlFunctionCategory.STRING);
 
+   // TODO: the return type of TO_TIMESTAMP should be TIMESTAMP(9)
 
 Review comment:
   tracked  by https://issues.apache.org/jira/browse/FLINK-14925


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Closed] (FLINK-14900) output contains null value when a is null in pattern where a <> 'String'

2019-11-21 Thread Kurt Young (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-14900?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kurt Young closed FLINK-14900.
--
  Assignee: Leonard Xu
Resolution: Duplicate

The reason is FLINK-14924

>  output contains null value  when a is null in pattern where a <> 'String' 
> ---
>
> Key: FLINK-14900
> URL: https://issues.apache.org/jira/browse/FLINK-14900
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.9.1
>Reporter: Leonard Xu
>Assignee: Leonard Xu
>Priority: Major
> Fix For: 1.10.0
>
>
> the result of TPC-DS query 19 is incorrect, I analyze the simplify the sql as 
> following:
> {code:java}
> // simplify query 19
> select top 100 ca_zip,s_zip,substring(ca_zip,1,5), substring(s_zip,1,5) 
>  from customer_address,store
>where substring(ca_zip,1,5) <> substring(s_zip,1,5) 
>   order by ca_zip,s_zip{code}
>  
> flink output:
> {code:java}
> null|31904|null|31904
> null|31904|null|31904
> null|31904|null|31904
> null|31904|null|31904{code}
> SQL server output:
> {code:java}
> 00601      31904      00601 31904
> 00601      31904      00601 31904
> 00601      31904  00601 31904
> 00601      31904      00601 31904
> {code}
> This issue may influent the result of tpcds query 46、68.
> Another similar pattern "where a = b" in query31,  it'll will bring null 
> value too.
>  
> {code:java}
> // simplify query 31
> select 
> ss1.ca_county
>,ss1.d_year
>,ss2.store_sales/ss1.store_sales store_q1_q2_increase   
>  from
> ss ss1
>,ss ss2  
>  where
> ss1.d_qoy = 1
> and ss1.d_year = 2000
> and ss1.ca_county = ss2.ca_county
> and ss2.d_qoy = 2
> and ss2.d_year = 2000
>   order by ss1.ca_county{code}
>  
> Flink output :
>  
> {code:java}
> |2000|0.845635
> Acadia Parish|2000|1.362160
> Accomack County|2000|0.650251
> {code}
>  
> SQL server output:
>  
> {code:java}
> Acadia Parish|2000|1.362159
> Accomack County|2000|0.650250
> {code}
>  
>  
> I tested expression test and sql ITcase,they will return correct value.
> {code:java}
> @Test
> def testSubStr(): Unit = {
>   checkQuery(
> Seq[(String, String, Double)]((null, "00601" , 100d), ("00501", "00601", 
> 50d)),
> "select f0, substring(f0,1,5) from  Table1 where substring(f0,1,5)  <> f1 
> ",
> Seq(("00501", "00501"))
>   )
> }
> {code}
> So, I need to dig more about this issue. 
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (FLINK-14902) JDBCTableSource support AsyncLookupFunction

2019-11-21 Thread Jark Wu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-14902?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jark Wu reassigned FLINK-14902:
---

Assignee: hailong wang

> JDBCTableSource support AsyncLookupFunction
> ---
>
> Key: FLINK-14902
> URL: https://issues.apache.org/jira/browse/FLINK-14902
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / JDBC
>Affects Versions: 1.9.0
>Reporter: hailong wang
>Assignee: hailong wang
>Priority: Major
> Fix For: 1.10.0
>
>
> JDBCTableSource support AsyncLookupFunction



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Closed] (FLINK-14895) same rank in rollup row with group row that all group key are null

2019-11-21 Thread Kurt Young (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-14895?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kurt Young closed FLINK-14895.
--
  Assignee: Leonard Xu
Resolution: Duplicate

The reason is FLINK-14924

> same rank in rollup row with group row that all group key are null 
> ---
>
> Key: FLINK-14895
> URL: https://issues.apache.org/jira/browse/FLINK-14895
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.9.1
>Reporter: Leonard Xu
>Assignee: Leonard Xu
>Priority: Major
> Fix For: 1.10.0
>
>
> Same rank in rollup row with group row that all group key are null, in tcp-ds 
> query 67,  the rank of our result is incorrect with other DB system.
> SQL:
> {code:java}
> select  *
> from (select i_category
> ,i_class
> ,i_brand
> ,i_product_name
> ,d_year
> ,d_qoy
> ,d_moy
> ,s_store_id
> ,sumsales
> ,rank() over (partition by i_category order by sumsales desc) rk
>   from (select i_category
>   ,i_class
>   ,i_brand
>   ,i_product_name
>   ,d_year
>   ,d_qoy
>   ,d_moy
>   ,s_store_id
>   ,sum(coalesce(ss_sales_price*ss_quantity,0)) sumsales
> from store_sales
> ,date_dim
> ,store
> ,item
>where  ss_sold_date_sk=d_date_sk
>   and ss_item_sk=i_item_sk
>   and ss_store_sk = s_store_sk
>   and d_month_seq between 1200 and 1200+11
>group by  rollup(i_category, i_class, i_brand, i_product_name, d_year, 
> d_qoy, d_moy,s_store_id))dw1) dw2
> where rk <= 100
> order by i_category
> ,i_class
> ,i_brand
> ,i_product_name
> ,d_year
> ,d_qoy
> ,d_moy
> ,s_store_id
> ,sumsales
> ,rk
> limit 100
> {code}
> correct result:
> {code:java}
> NULL|NULL|NULL|NULL|NULL|NULL|NULL|NULL|596191.74|4
> NULL|NULL|NULL|NULL|NULL|NULL|NULL|NULL|1628997.00|3
> NULL|NULL|NULL|NULL|NULL|NULL|NULL|NULL|3113996.92|2
> NULL|NULL|NULL|NULL|NULL|NULL|NULL|NULL|1019789218.69|1  //rollup row
> {code}
> our result:
> {code:java}
> 1019789218.69|1  //rollup row
> 3113996.92|1
> 1628997.00|2
> 596191.74|3
> {code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] flinkbot edited a comment on issue #10287: [FLINK-14915][runtime] Remove the unnecessary param JobGraph from SchedulingStrategyFactory#createInstance

2019-11-21 Thread GitBox
flinkbot edited a comment on issue #10287: [FLINK-14915][runtime] Remove the 
unnecessary param JobGraph from SchedulingStrategyFactory#createInstance
URL: https://github.com/apache/flink/pull/10287#issuecomment-557403940
 
 
   
   ## CI report:
   
   * 91a9712e38f270006e6577e2976deb9993bb8243 : PENDING 
[Build](https://travis-ci.com/flink-ci/flink/builds/137712205)
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Created] (FLINK-14925) the return type of TO_TIMESTAMP should be Timestamp(9) instead of Timestamp(3)

2019-11-21 Thread Zhenghua Gao (Jira)
Zhenghua Gao created FLINK-14925:


 Summary: the return type of TO_TIMESTAMP should be Timestamp(9) 
instead of Timestamp(3)
 Key: FLINK-14925
 URL: https://issues.apache.org/jira/browse/FLINK-14925
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Planner
Affects Versions: 1.10.0
Reporter: Zhenghua Gao






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] flinkbot edited a comment on issue #10288: [FLINK-14765] Remove STATE_UPDATER in Execution

2019-11-21 Thread GitBox
flinkbot edited a comment on issue #10288: [FLINK-14765] Remove STATE_UPDATER 
in Execution
URL: https://github.com/apache/flink/pull/10288#issuecomment-557403970
 
 
   
   ## CI report:
   
   * 018cc229676d6c6634ce4113342f46b42eb24416 : PENDING 
[Build](https://travis-ci.com/flink-ci/flink/builds/137712216)
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] docete commented on a change in pull request #10268: [Flink-14599][table-planner-blink] Support precision of TimestampType in blink planner

2019-11-21 Thread GitBox
docete commented on a change in pull request #10268: 
[Flink-14599][table-planner-blink] Support precision of TimestampType in blink 
planner
URL: https://github.com/apache/flink/pull/10268#discussion_r349449435
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java
 ##
 @@ -644,10 +644,13 @@ public SqlMonotonicity 
getMonotonicity(SqlOperatorBinding call) {
OperandTypes.family(SqlTypeFamily.STRING, 
SqlTypeFamily.INTEGER),
SqlFunctionCategory.STRING);
 
+   // TODO: the return type of TO_TIMESTAMP should be TIMESTAMP(9)
 
 Review comment:
   org.apache.flink.table.api.TableException: Unsupported conversion from data 
type 'TIMESTAMP(9)' (conversion class: java.time.LocalDateTime) to type 
information. Only data types that originated from type information fully 
support a reverse conversion.
   
at 
org.apache.flink.table.types.utils.LegacyTypeInfoDataTypeConverter.toLegacyTypeInfo(LegacyTypeInfoDataTypeConverter.java:246)
at 
org.apache.flink.table.types.utils.TypeConversions.fromDataTypeToLegacyInfo(TypeConversions.java:49)
at 
java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
at 
java.util.Spliterators$ArraySpliterator.forEachRemaining(Spliterators.java:948)
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
at 
java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:545)
at 
java.util.stream.AbstractPipeline.evaluateToArrayNode(AbstractPipeline.java:260)
at 
java.util.stream.ReferencePipeline.toArray(ReferencePipeline.java:438)
at 
org.apache.flink.table.types.utils.TypeConversions.fromDataTypeToLegacyInfo(TypeConversions.java:55)
at 
org.apache.flink.table.api.TableSchema.getFieldTypes(TableSchema.java:107)
at 
org.apache.flink.table.descriptors.DescriptorProperties.putTableSchema(DescriptorProperties.java:200)
at 
org.apache.flink.table.catalog.CatalogTableImpl.toProperties(CatalogTableImpl.java:75)
at 
org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:92)
at 
org.apache.flink.table.planner.plan.schema.CatalogSourceTable.findAndCreateTableSource(CatalogSourceTable.scala:124)
at 
org.apache.flink.table.planner.plan.schema.CatalogSourceTable.tableSource$lzycompute(CatalogSourceTable.scala:65)
at 
org.apache.flink.table.planner.plan.schema.CatalogSourceTable.tableSource(CatalogSourceTable.scala:65)
at 
org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.scala:76)
at 
org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3328)
at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2357)
at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2051)
at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2005)
at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:646)
at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:627)
at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3181)
at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:563)
at 
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:130)
at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:264)
at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:182)
at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:104)
at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:166)
at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:102)
at 
org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:66)
at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:448)
at 
org.apache.flink.table.planner.catalog.CatalogTableITCase.testInsertSourceTableWithFuncField(CatalogTableITCase.scala:323)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
   

[jira] [Commented] (FLINK-14924) CsvTableSource can not config empty column as null

2019-11-21 Thread Kurt Young (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14924?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16979898#comment-16979898
 ] 

Kurt Young commented on FLINK-14924:


we met this problem before in blink, you can check this codes: 
[https://github.com/apache/flink/blob/blink/flink-java/src/main/java/org/apache/flink/api/java/io/AbstractRowCsvInputFormat.java]

we added an `emptyColumnAsNull` option in csv input format in blink. 

> CsvTableSource can not config empty column as null
> --
>
> Key: FLINK-14924
> URL: https://issues.apache.org/jira/browse/FLINK-14924
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Affects Versions: 1.9.1
>Reporter: Leonard Xu
>Priority: Major
> Fix For: 1.10.0
>
>
> CsvTableSource can not config empty column as null, 
> and convert  to empty string by default.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] JingsongLi commented on a change in pull request #10268: [Flink-14599][table-planner-blink] Support precision of TimestampType in blink planner

2019-11-21 Thread GitBox
JingsongLi commented on a change in pull request #10268: 
[Flink-14599][table-planner-blink] Support precision of TimestampType in blink 
planner
URL: https://github.com/apache/flink/pull/10268#discussion_r349447644
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java
 ##
 @@ -644,10 +644,13 @@ public SqlMonotonicity 
getMonotonicity(SqlOperatorBinding call) {
OperandTypes.family(SqlTypeFamily.STRING, 
SqlTypeFamily.INTEGER),
SqlFunctionCategory.STRING);
 
+   // TODO: the return type of TO_TIMESTAMP should be TIMESTAMP(9)
 
 Review comment:
   I can not find `fix to_timestamp` in FLINK-14645.
   Please create a JIRA to do real `fix to_timestamp to precision 9`. It should 
be blocked by FLINK-14645.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] JingsongLi commented on a change in pull request #10268: [Flink-14599][table-planner-blink] Support precision of TimestampType in blink planner

2019-11-21 Thread GitBox
JingsongLi commented on a change in pull request #10268: 
[Flink-14599][table-planner-blink] Support precision of TimestampType in blink 
planner
URL: https://github.com/apache/flink/pull/10268#discussion_r349446929
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java
 ##
 @@ -644,10 +644,13 @@ public SqlMonotonicity 
getMonotonicity(SqlOperatorBinding call) {
OperandTypes.family(SqlTypeFamily.STRING, 
SqlTypeFamily.INTEGER),
SqlFunctionCategory.STRING);
 
+   // TODO: the return type of TO_TIMESTAMP should be TIMESTAMP(9)
 
 Review comment:
   So which stack would be failed? Shouldn't this be a bug?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] docete commented on a change in pull request #10268: [Flink-14599][table-planner-blink] Support precision of TimestampType in blink planner

2019-11-21 Thread GitBox
docete commented on a change in pull request #10268: 
[Flink-14599][table-planner-blink] Support precision of TimestampType in blink 
planner
URL: https://github.com/apache/flink/pull/10268#discussion_r349446907
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java
 ##
 @@ -644,10 +644,13 @@ public SqlMonotonicity 
getMonotonicity(SqlOperatorBinding call) {
OperandTypes.family(SqlTypeFamily.STRING, 
SqlTypeFamily.INTEGER),
SqlFunctionCategory.STRING);
 
+   // TODO: the return type of TO_TIMESTAMP should be TIMESTAMP(9)
 
 Review comment:
   FLINK-14645 would fix the de/serializing style in DescriptorProperties 
put/getTableschema and skip the conversion from DataType to TypeInformation(and 
back),which also fix the to_timestamp in DDL case.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] openinx commented on issue #10270: [FLINK-14672][sql-client] Make Executor stateful in sql client

2019-11-21 Thread GitBox
openinx commented on issue #10270: [FLINK-14672][sql-client] Make Executor 
stateful in sql client
URL: https://github.com/apache/flink/pull/10270#issuecomment-557406344
 
 
   Sync with @aljoscha on wednesday,  he should  know the issue I working.  Of 
course,  feel free to feedback if anything I missed, FYI @TisonKun , @aljoscha 
, Thanks.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Created] (FLINK-14924) CsvTableSource can not config empty column as null

2019-11-21 Thread Leonard Xu (Jira)
Leonard Xu created FLINK-14924:
--

 Summary: CsvTableSource can not config empty column as null
 Key: FLINK-14924
 URL: https://issues.apache.org/jira/browse/FLINK-14924
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / API
Affects Versions: 1.9.1
Reporter: Leonard Xu
 Fix For: 1.10.0


CsvTableSource can not config empty column as null, 
and convert  to empty string by default.

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] JingsongLi commented on a change in pull request #10268: [Flink-14599][table-planner-blink] Support precision of TimestampType in blink planner

2019-11-21 Thread GitBox
JingsongLi commented on a change in pull request #10268: 
[Flink-14599][table-planner-blink] Support precision of TimestampType in blink 
planner
URL: https://github.com/apache/flink/pull/10268#discussion_r349445639
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/expressions/converter/ExpressionConverter.java
 ##
 @@ -135,8 +138,24 @@ public RexNode visit(ValueLiteralExpression valueLiteral) 
{
return 
relBuilder.getRexBuilder().makeTimeLiteral(TimeString.fromCalendarFields(

valueAsCalendar(extractValue(valueLiteral, java.sql.Time.class))), 0);
case TIMESTAMP_WITHOUT_TIME_ZONE:
-   return 
relBuilder.getRexBuilder().makeTimestampLiteral(TimestampString.fromCalendarFields(
-   
valueAsCalendar(extractValue(valueLiteral, java.sql.Timestamp.class))), 3);
+   TimestampType timestampType = (TimestampType) 
type;
+   Class clazz = 
valueLiteral.getOutputDataType().getConversionClass();
+   LocalDateTime datetime = null;
+   if (clazz == LocalDateTime.class) {
+   datetime = extractValue(valueLiteral, 
LocalDateTime.class);
+   } else if (clazz == Timestamp.class) {
+   datetime = extractValue(valueLiteral, 
Timestamp.class).toLocalDateTime();
+   } else {
+   throw new 
TableException(String.format("Invalid literal of %s.", 
clazz.getCanonicalName()));
+   }
+   return 
relBuilder.getRexBuilder().makeTimestampLiteral(
+   new TimestampString(
 
 Review comment:
   I would say it's worth adding methods.
   First of all, this transformation is not simple. It requires 7 + method 
calls.
   It's not a single constructor, it's easy to forget `withNanos`.
   Besides, I don't think we will have nowhere to put down a method of reusing 
code.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] JingsongLi commented on issue #10268: [Flink-14599][table-planner-blink] Support precision of TimestampType in blink planner

2019-11-21 Thread GitBox
JingsongLi commented on issue #10268: [Flink-14599][table-planner-blink] 
Support precision of TimestampType in blink planner
URL: https://github.com/apache/flink/pull/10268#issuecomment-557404480
 
 
   > `JavaUserDefinedScalarFunctions`.`JavaFunc5`
   
   Can you add a `LocalDateTime` too?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] JingsongLi commented on a change in pull request #10268: [Flink-14599][table-planner-blink] Support precision of TimestampType in blink planner

2019-11-21 Thread GitBox
JingsongLi commented on a change in pull request #10268: 
[Flink-14599][table-planner-blink] Support precision of TimestampType in blink 
planner
URL: https://github.com/apache/flink/pull/10268#discussion_r349445023
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/utils/DateTimeTestUtil.scala
 ##
 @@ -47,7 +46,8 @@ object DateTimeTestUtil {
 if (s == null) {
   null
 } else {
-  
LocalDateTimeConverter.INSTANCE.toExternal(DateTimeUtils.timestampStringToUnixDate(s))
+  LocalDateTimeConverter.INSTANCE.toExternal(
+
SqlTimestamp.fromEpochMillis(DateTimeUtils.timestampStringToUnixDate(s)))
 
 Review comment:
   This should support precision bigger than 3.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] JingsongLi commented on a change in pull request #10268: [Flink-14599][table-planner-blink] Support precision of TimestampType in blink planner

2019-11-21 Thread GitBox
JingsongLi commented on a change in pull request #10268: 
[Flink-14599][table-planner-blink] Support precision of TimestampType in blink 
planner
URL: https://github.com/apache/flink/pull/10268#discussion_r349441793
 
 

 ##
 File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/typeutils/LegacyLocalDateTimeTypeInfo.java
 ##
 @@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.typeutils;
+
+import org.apache.flink.api.common.typeinfo.LocalTimeTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.base.LocalDateTimeComparator;
+import org.apache.flink.api.common.typeutils.base.LocalDateTimeSerializer;
+
+import java.time.LocalDateTime;
+
+/**
+ * {@link TypeInformation} for {@link LocalDateTime}.
+ *
+ * The difference between Types.LOCAL_DATE_TIME is this TypeInformation 
holds a precision
+ * Reminder: Conversion from DateType to TypeInformation (and back) exists in
+ * TableSourceUtil.computeIndexMapping, which should be fixed after we remove 
Legacy TypeInformation
+ */
+public class LegacyLocalDateTimeTypeInfo extends 
LocalTimeTypeInfo {
+
+   private final int precision;
+
+   public LegacyLocalDateTimeTypeInfo(int precision) {
+   super(
+   LocalDateTime.class,
+   LocalDateTimeSerializer.INSTANCE,
+   LocalDateTimeComparator.class);
+   this.precision = precision;
+   }
+
+   @Override
+   public boolean equals(Object obj) {
 
 Review comment:
   Please override hashcode too.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] JingsongLi commented on a change in pull request #10268: [Flink-14599][table-planner-blink] Support precision of TimestampType in blink planner

2019-11-21 Thread GitBox
JingsongLi commented on a change in pull request #10268: 
[Flink-14599][table-planner-blink] Support precision of TimestampType in blink 
planner
URL: https://github.com/apache/flink/pull/10268#discussion_r349440840
 
 

 ##
 File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/typeutils/LegacyTimestampTypeInfo.java
 ##
 @@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.typeutils;
+
+import org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.base.SqlTimestampComparator;
+
+import java.sql.Timestamp;
+
+/**
+ * {@link TypeInformation} for {@link Timestamp}.
+ *
+ * The difference between Types.SQL_TIMESTAMP is this TypeInformation holds 
a precision
+ * Reminder: Conversion from DateType to TypeInformation (and back) exists in
+ * TableSourceUtil.computeIndexMapping, which should be fixed after we remove 
Legacy TypeInformation
+ */
+public class LegacyTimestampTypeInfo extends SqlTimeTypeInfo {
+
+   private final int precision;
+
+   @SuppressWarnings("unchecked")
+   public LegacyTimestampTypeInfo(int precision) {
+   super(
+   Timestamp.class,
+   
org.apache.flink.api.common.typeutils.base.SqlTimestampSerializer.INSTANCE,
 
 Review comment:
   import `SqlTimestampSerializer`


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] JingsongLi commented on a change in pull request #10268: [Flink-14599][table-planner-blink] Support precision of TimestampType in blink planner

2019-11-21 Thread GitBox
JingsongLi commented on a change in pull request #10268: 
[Flink-14599][table-planner-blink] Support precision of TimestampType in blink 
planner
URL: https://github.com/apache/flink/pull/10268#discussion_r349443459
 
 

 ##
 File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/match/RowtimeProcessFunction.java
 ##
 @@ -44,7 +44,7 @@ public RowtimeProcessFunction(int rowtimeIdx, 
TypeInformation returnTyp
 
@Override
public void processElement(BaseRow value, Context ctx, 
Collector out) throws Exception {
-   long timestamp = value.getLong(rowtimeIdx);
+   long timestamp = value.getTimestamp(rowtimeIdx, 
3).getMillisecond();
 
 Review comment:
   Can Timestamp with 9 be a time attribute?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] JingsongLi commented on a change in pull request #10268: [Flink-14599][table-planner-blink] Support precision of TimestampType in blink planner

2019-11-21 Thread GitBox
JingsongLi commented on a change in pull request #10268: 
[Flink-14599][table-planner-blink] Support precision of TimestampType in blink 
planner
URL: https://github.com/apache/flink/pull/10268#discussion_r349431966
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/CodeGenUtils.scala
 ##
 @@ -590,8 +597,7 @@ object CodeGenUtils {
 case DOUBLE => s"$arrayTerm.setNullDouble($index)"
 case TIME_WITHOUT_TIME_ZONE => s"$arrayTerm.setNullInt($index)"
 case DATE => s"$arrayTerm.setNullInt($index)"
-case TIMESTAMP_WITHOUT_TIME_ZONE |
- TIMESTAMP_WITH_LOCAL_TIME_ZONE => s"$arrayTerm.setNullLong($index)"
+case TIMESTAMP_WITH_LOCAL_TIME_ZONE => s"$arrayTerm.setNullLong($index)"
 
 Review comment:
   No need to modify, but if you want to modify, you can remove all 
`setNullLong`  to `case _`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] JingsongLi commented on a change in pull request #10268: [Flink-14599][table-planner-blink] Support precision of TimestampType in blink planner

2019-11-21 Thread GitBox
JingsongLi commented on a change in pull request #10268: 
[Flink-14599][table-planner-blink] Support precision of TimestampType in blink 
planner
URL: https://github.com/apache/flink/pull/10268#discussion_r349444601
 
 

 ##
 File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/DataFormatConverters.java
 ##
 @@ -691,27 +699,31 @@ LocalTime toExternalImpl(BaseRow row, int column) {
/**
 * Converter for LocalDateTime.
 */
-   public static final class LocalDateTimeConverter extends 
DataFormatConverter {
+   public static final class LocalDateTimeConverter extends 
DataFormatConverter {
 
private static final long serialVersionUID = 1L;
 
-   public static final LocalDateTimeConverter INSTANCE = new 
LocalDateTimeConverter();
+   public static final LocalDateTimeConverter INSTANCE = new 
LocalDateTimeConverter(3);
 
-   private LocalDateTimeConverter() {}
+   private final int precision;
+
+   private LocalDateTimeConverter(int precision) {
 
 Review comment:
   It should be `public`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] JingsongLi commented on a change in pull request #10268: [Flink-14599][table-planner-blink] Support precision of TimestampType in blink planner

2019-11-21 Thread GitBox
JingsongLi commented on a change in pull request #10268: 
[Flink-14599][table-planner-blink] Support precision of TimestampType in blink 
planner
URL: https://github.com/apache/flink/pull/10268#discussion_r349444664
 
 

 ##
 File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/DataFormatConverters.java
 ##
 @@ -799,27 +811,31 @@ Time toExternalImpl(BaseRow row, int column) {
/**
 * Converter for timestamp.
 */
-   public static final class TimestampConverter extends 
DataFormatConverter {
+   public static final class TimestampConverter extends 
DataFormatConverter {
 
private static final long serialVersionUID = 
-779956524906131757L;
 
-   public static final TimestampConverter INSTANCE = new 
TimestampConverter();
+   public static final TimestampConverter INSTANCE = new 
TimestampConverter(3);
 
 Review comment:
   Ditto.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] JingsongLi commented on a change in pull request #10268: [Flink-14599][table-planner-blink] Support precision of TimestampType in blink planner

2019-11-21 Thread GitBox
JingsongLi commented on a change in pull request #10268: 
[Flink-14599][table-planner-blink] Support precision of TimestampType in blink 
planner
URL: https://github.com/apache/flink/pull/10268#discussion_r349442524
 
 

 ##
 File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/types/TypeInfoDataTypeConverter.java
 ##
 @@ -98,6 +105,20 @@ private static void addDefaultTypeInfo(Class clazz, 
TypeInformation typeIn
}
LogicalType logicalType = fromDataTypeToLogicalType(dataType);
switch (logicalType.getTypeRoot()) {
+   case TIMESTAMP_WITHOUT_TIME_ZONE:
+   TimestampType timestampType = (TimestampType) 
logicalType;
+   int precision = timestampType.getPrecision();
+   if (timestampType.getKind() == 
TimestampKind.REGULAR) {
+   return clazz == SqlTimestamp.class ?
+   new 
SqlTimestampTypeInfo(precision) :
+   (clazz == LocalDateTime.class ?
+   ((3 == precision) ?
 
 Review comment:
   Why precision 3 should be `Types.LOCAL_DATE_TIME`, I think 
`Types.LOCAL_DATE_TIME` can express 9 precision.
   Why you should treat precision 3 differently?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] JingsongLi commented on a change in pull request #10268: [Flink-14599][table-planner-blink] Support precision of TimestampType in blink planner

2019-11-21 Thread GitBox
JingsongLi commented on a change in pull request #10268: 
[Flink-14599][table-planner-blink] Support precision of TimestampType in blink 
planner
URL: https://github.com/apache/flink/pull/10268#discussion_r349443102
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/sort/SortCodeGenerator.scala
 ##
 @@ -437,7 +439,9 @@ class SortCodeGenerator(
 t.getTypeRoot match {
   case _ if PlannerTypeUtils.isPrimitive(t) => true
   case VARCHAR | CHAR | VARBINARY | BINARY |
-   DATE | TIME_WITHOUT_TIME_ZONE | TIMESTAMP_WITHOUT_TIME_ZONE => true
+   DATE | TIME_WITHOUT_TIME_ZONE => true
+  case TIMESTAMP_WITHOUT_TIME_ZONE =>
 
 Review comment:
   precision with 9 also support normalized key.
   If you don't want to support now, please add TODO comments to this class.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] JingsongLi commented on a change in pull request #10268: [Flink-14599][table-planner-blink] Support precision of TimestampType in blink planner

2019-11-21 Thread GitBox
JingsongLi commented on a change in pull request #10268: 
[Flink-14599][table-planner-blink] Support precision of TimestampType in blink 
planner
URL: https://github.com/apache/flink/pull/10268#discussion_r349444341
 
 

 ##
 File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/functions/SqlDateTimeUtils.java
 ##
 @@ -205,6 +214,36 @@ public static long toTimestamp(Decimal v) {
// 

// String --> Timestamp conversion
// 

+   public static SqlTimestamp toSqlTimestamp(String dateStr) {
+   int length = dateStr.length();
+   String format;
+   if (length == 10) {
+   format = DATE_FORMAT_STRING;
+   } else if (length >= 21 && length <= 29) {
+   format = DEFAULT_DATETIME_FORMATS[length - 20];
+   } else {
+   // otherwise fall back to second's precision
+   format = DEFAULT_DATETIME_FORMATS[0];
+   }
+   return toSqlTimestamp(dateStr, format);
+   }
+
+   public static SqlTimestamp toSqlTimestamp(String dateStr, String 
format) {
+   DateTimeFormatter formatter = 
DateTimeFormatter.ofPattern(format);
 
 Review comment:
   `DateTimeFormatter.ofPattern` is very expensive performance, please add 
cache like previous `FORMATTER_CACHE`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] JingsongLi commented on a change in pull request #10268: [Flink-14599][table-planner-blink] Support precision of TimestampType in blink planner

2019-11-21 Thread GitBox
JingsongLi commented on a change in pull request #10268: 
[Flink-14599][table-planner-blink] Support precision of TimestampType in blink 
planner
URL: https://github.com/apache/flink/pull/10268#discussion_r349444627
 
 

 ##
 File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/DataFormatConverters.java
 ##
 @@ -691,27 +699,31 @@ LocalTime toExternalImpl(BaseRow row, int column) {
/**
 * Converter for LocalDateTime.
 */
-   public static final class LocalDateTimeConverter extends 
DataFormatConverter {
+   public static final class LocalDateTimeConverter extends 
DataFormatConverter {
 
private static final long serialVersionUID = 1L;
 
-   public static final LocalDateTimeConverter INSTANCE = new 
LocalDateTimeConverter();
+   public static final LocalDateTimeConverter INSTANCE = new 
LocalDateTimeConverter(3);
 
 Review comment:
   Please remove this legacy one.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] JingsongLi commented on a change in pull request #10268: [Flink-14599][table-planner-blink] Support precision of TimestampType in blink planner

2019-11-21 Thread GitBox
JingsongLi commented on a change in pull request #10268: 
[Flink-14599][table-planner-blink] Support precision of TimestampType in blink 
planner
URL: https://github.com/apache/flink/pull/10268#discussion_r349440800
 
 

 ##
 File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/typeutils/LegacyTimestampTypeInfo.java
 ##
 @@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.typeutils;
+
+import org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.base.SqlTimestampComparator;
+
+import java.sql.Timestamp;
+
+/**
+ * {@link TypeInformation} for {@link Timestamp}.
+ *
+ * The difference between Types.SQL_TIMESTAMP is this TypeInformation holds 
a precision
+ * Reminder: Conversion from DateType to TypeInformation (and back) exists in
+ * TableSourceUtil.computeIndexMapping, which should be fixed after we remove 
Legacy TypeInformation
+ */
+public class LegacyTimestampTypeInfo extends SqlTimeTypeInfo {
+
+   private final int precision;
+
+   @SuppressWarnings("unchecked")
+   public LegacyTimestampTypeInfo(int precision) {
+   super(
+   Timestamp.class,
+   
org.apache.flink.api.common.typeutils.base.SqlTimestampSerializer.INSTANCE,
+   (Class) SqlTimestampComparator.class);
+   this.precision = precision;
+   }
+
+   @Override
+   public boolean equals(Object obj) {
 
 Review comment:
   Please override `hashcode` too.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] JingsongLi commented on a change in pull request #10268: [Flink-14599][table-planner-blink] Support precision of TimestampType in blink planner

2019-11-21 Thread GitBox
JingsongLi commented on a change in pull request #10268: 
[Flink-14599][table-planner-blink] Support precision of TimestampType in blink 
planner
URL: https://github.com/apache/flink/pull/10268#discussion_r349444678
 
 

 ##
 File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/DataFormatConverters.java
 ##
 @@ -799,27 +811,31 @@ Time toExternalImpl(BaseRow row, int column) {
/**
 * Converter for timestamp.
 */
-   public static final class TimestampConverter extends 
DataFormatConverter {
+   public static final class TimestampConverter extends 
DataFormatConverter {
 
private static final long serialVersionUID = 
-779956524906131757L;
 
-   public static final TimestampConverter INSTANCE = new 
TimestampConverter();
+   public static final TimestampConverter INSTANCE = new 
TimestampConverter(3);
 
-   private TimestampConverter() {}
+   private final int precision;
+
+   private TimestampConverter(int precision) {
 
 Review comment:
   Ditto.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot commented on issue #10287: [FLINK-14915][runtime] Remove the unnecessary param JobGraph from SchedulingStrategyFactory#createInstance

2019-11-21 Thread GitBox
flinkbot commented on issue #10287: [FLINK-14915][runtime] Remove the 
unnecessary param JobGraph from SchedulingStrategyFactory#createInstance
URL: https://github.com/apache/flink/pull/10287#issuecomment-557403940
 
 
   
   ## CI report:
   
   * 91a9712e38f270006e6577e2976deb9993bb8243 : UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot commented on issue #10288: [FLINK-14765] Remove STATE_UPDATER in Execution

2019-11-21 Thread GitBox
flinkbot commented on issue #10288: [FLINK-14765] Remove STATE_UPDATER in 
Execution
URL: https://github.com/apache/flink/pull/10288#issuecomment-557403970
 
 
   
   ## CI report:
   
   * 018cc229676d6c6634ce4113342f46b42eb24416 : UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] docete commented on a change in pull request #10268: [Flink-14599][table-planner-blink] Support precision of TimestampType in blink planner

2019-11-21 Thread GitBox
docete commented on a change in pull request #10268: 
[Flink-14599][table-planner-blink] Support precision of TimestampType in blink 
planner
URL: https://github.com/apache/flink/pull/10268#discussion_r349443370
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java
 ##
 @@ -644,10 +644,13 @@ public SqlMonotonicity 
getMonotonicity(SqlOperatorBinding call) {
OperandTypes.family(SqlTypeFamily.STRING, 
SqlTypeFamily.INTEGER),
SqlFunctionCategory.STRING);
 
+   // TODO: the return type of TO_TIMESTAMP should be TIMESTAMP(9)
 
 Review comment:
   `CatalogTableITCase`.`testInsertSourceTableWithFuncField` use 
`to_timestamp`, and `LegacyTypeInfoDataTypeConverter`.`toLegacyTypeInfo` would 
be fail if we use Timestamp(9) since it would be converted to SQL_TIMESTAMP. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-14883) Resource management on state backends

2019-11-21 Thread Yu Li (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14883?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16979880#comment-16979880
 ] 

Yu Li commented on FLINK-14883:
---

{quote}
We have built FrocksDB release jar package on mac, linux32, linux64, windows 
and ppc64le successfully with frocksdb-4, frocksdb-5 and frocksdb-6.

Until now the jar package with frocksdb-4 and frocksdb-5 have passed the 
Flink-CI, state-benchmark and e2e tests of state_ttl on ppc64le.

We're still working on the release with newly-included frocksdb-6
{quote}
[~yunta] I guess this comment belongs to FLINK-14483 instead of here, mind 
migrate it to the right place? Thanks.

> Resource management on state backends
> -
>
> Key: FLINK-14883
> URL: https://issues.apache.org/jira/browse/FLINK-14883
> Project: Flink
>  Issue Type: New Feature
>  Components: Runtime / State Backends
>Reporter: Yu Li
>Priority: Major
>  Labels: resource-management
>
> This is the umbrella issue for resource management on state backends, 
> especially the memory management, as mentioned in 
> [FLIP-49|https://cwiki.apache.org/confluence/display/FLINK/FLIP-49%3A+Unified+Memory+Configuration+for+TaskExecutors]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


  1   2   3   4   5   6   7   >