[GitHub] twalthr commented on a change in pull request #6282: [FLINK-6847][FLINK-6813] [table] TimestampDiff table api and sql support

2018-09-21 Thread GitBox
twalthr commented on a change in pull request #6282: [FLINK-6847][FLINK-6813] 
[table] TimestampDiff table api and sql support
URL: https://github.com/apache/flink/pull/6282#discussion_r219524046
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/time.scala
 ##
 @@ -338,3 +341,66 @@ case class DateFormat(timestamp: Expression, format: 
Expression) extends Express
 
   override private[flink] def resultType = STRING_TYPE_INFO
 }
+
+case class TimestampDiff(
+timeIntervalUnit: Expression,
+timestamp1: Expression,
+timestamp2: Expression)
+  extends Expression {
+
+  override private[flink] def children: Seq[Expression] =
+timeIntervalUnit :: timestamp1 :: timestamp2 :: Nil
+
+  override private[flink] def validateInput(): ValidationResult = {
+if (!TypeCheckUtils.isTimePoint(timestamp1.resultType)) {
+  return ValidationFailure(s"TimestampDiff operator requires Temporal 
input, " +
+s"but timestamp1 is of type ${timestamp1.resultType}")
+}
+
+if (!TypeCheckUtils.isTimePoint(timestamp2.resultType)) {
+  return ValidationFailure(s"TimestampDiff operator requires Temporal 
input, " +
+s"but timestamp2 is of type ${timestamp2.resultType}")
+}
+
+timeIntervalUnit match {
+  case SymbolExpression(TimeIntervalUnit.YEAR)
+   | SymbolExpression(TimeIntervalUnit.MONTH)
+   | SymbolExpression(TimeIntervalUnit.DAY)
+   | SymbolExpression(TimeIntervalUnit.HOUR)
+   | SymbolExpression(TimeIntervalUnit.MINUTE)
+   | SymbolExpression(TimeIntervalUnit.SECOND)
+if timestamp1.resultType == SqlTimeTypeInfo.DATE
+  || timestamp1.resultType == SqlTimeTypeInfo.TIMESTAMP
+  || timestamp2.resultType == SqlTimeTypeInfo.DATE
+  || timestamp2.resultType == SqlTimeTypeInfo.TIMESTAMP =>
+ValidationSuccess
+
+  case _ =>
+ValidationFailure(s"TimestampDiff operator does not support unit 
'$timeIntervalUnit'" +
+s" for input of type ('${timestamp1.resultType}', 
'${timestamp2.resultType}').")
+}
+  }
+  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): 
RexNode = {
+val typeFactory = relBuilder
+  .asInstanceOf[FlinkRelBuilder]
+  .getTypeFactory
+
+val intervalUnit = timeIntervalUnit.asInstanceOf[SymbolExpression].symbol
+  .enum.asInstanceOf[TimeUnitRange]
+val intervalType = typeFactory.createSqlIntervalType(
+  new SqlIntervalQualifier(intervalUnit.startUnit, intervalUnit.endUnit, 
SqlParserPos.ZERO))
+
+val rexCall = relBuilder
+  .getRexBuilder
+  .makeCall(intervalType, SqlStdOperatorTable.MINUS_DATE,
+List(timestamp2.toRexNode, timestamp1.toRexNode))
+
+val intType = typeFactory.createSqlType(SqlTypeName.INTEGER)
+
+relBuilder.getRexBuilder.makeCast(intType, rexCall)
+  }
+
+  override def toString: String = s"timestampDiff(${children.mkString(", ")})"
+
+  override private[flink] def resultType = INT_TYPE_INFO
 
 Review comment:
   Calcite returns integer so far. See 
`org.apache.calcite.sql.fun.SqlTimestampDiffFunction`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] twalthr commented on a change in pull request #6282: [FLINK-6847][FLINK-6813] [table] TimestampDiff table api and sql support

2018-09-21 Thread GitBox
twalthr commented on a change in pull request #6282: [FLINK-6847][FLINK-6813] 
[table] TimestampDiff table api and sql support
URL: https://github.com/apache/flink/pull/6282#discussion_r219524046
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/time.scala
 ##
 @@ -338,3 +341,66 @@ case class DateFormat(timestamp: Expression, format: 
Expression) extends Express
 
   override private[flink] def resultType = STRING_TYPE_INFO
 }
+
+case class TimestampDiff(
+timeIntervalUnit: Expression,
+timestamp1: Expression,
+timestamp2: Expression)
+  extends Expression {
+
+  override private[flink] def children: Seq[Expression] =
+timeIntervalUnit :: timestamp1 :: timestamp2 :: Nil
+
+  override private[flink] def validateInput(): ValidationResult = {
+if (!TypeCheckUtils.isTimePoint(timestamp1.resultType)) {
+  return ValidationFailure(s"TimestampDiff operator requires Temporal 
input, " +
+s"but timestamp1 is of type ${timestamp1.resultType}")
+}
+
+if (!TypeCheckUtils.isTimePoint(timestamp2.resultType)) {
+  return ValidationFailure(s"TimestampDiff operator requires Temporal 
input, " +
+s"but timestamp2 is of type ${timestamp2.resultType}")
+}
+
+timeIntervalUnit match {
+  case SymbolExpression(TimeIntervalUnit.YEAR)
+   | SymbolExpression(TimeIntervalUnit.MONTH)
+   | SymbolExpression(TimeIntervalUnit.DAY)
+   | SymbolExpression(TimeIntervalUnit.HOUR)
+   | SymbolExpression(TimeIntervalUnit.MINUTE)
+   | SymbolExpression(TimeIntervalUnit.SECOND)
+if timestamp1.resultType == SqlTimeTypeInfo.DATE
+  || timestamp1.resultType == SqlTimeTypeInfo.TIMESTAMP
+  || timestamp2.resultType == SqlTimeTypeInfo.DATE
+  || timestamp2.resultType == SqlTimeTypeInfo.TIMESTAMP =>
+ValidationSuccess
+
+  case _ =>
+ValidationFailure(s"TimestampDiff operator does not support unit 
'$timeIntervalUnit'" +
+s" for input of type ('${timestamp1.resultType}', 
'${timestamp2.resultType}').")
+}
+  }
+  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): 
RexNode = {
+val typeFactory = relBuilder
+  .asInstanceOf[FlinkRelBuilder]
+  .getTypeFactory
+
+val intervalUnit = timeIntervalUnit.asInstanceOf[SymbolExpression].symbol
+  .enum.asInstanceOf[TimeUnitRange]
+val intervalType = typeFactory.createSqlIntervalType(
+  new SqlIntervalQualifier(intervalUnit.startUnit, intervalUnit.endUnit, 
SqlParserPos.ZERO))
+
+val rexCall = relBuilder
+  .getRexBuilder
+  .makeCall(intervalType, SqlStdOperatorTable.MINUS_DATE,
+List(timestamp2.toRexNode, timestamp1.toRexNode))
+
+val intType = typeFactory.createSqlType(SqlTypeName.INTEGER)
+
+relBuilder.getRexBuilder.makeCast(intType, rexCall)
+  }
+
+  override def toString: String = s"timestampDiff(${children.mkString(", ")})"
+
+  override private[flink] def resultType = INT_TYPE_INFO
 
 Review comment:
   Calcite return integer so far. See 
`org.apache.calcite.sql.fun.SqlTimestampDiffFunction`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] twalthr commented on a change in pull request #6282: [FLINK-6847][FLINK-6813] [table] TimestampDiff table api and sql support

2018-08-07 Thread GitBox
twalthr commented on a change in pull request #6282: [FLINK-6847][FLINK-6813] 
[table] TimestampDiff table api and sql support
URL: https://github.com/apache/flink/pull/6282#discussion_r208163167
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarOperators.scala
 ##
 @@ -861,6 +865,35 @@ object ScalarOperators {
   (l, r) => s"${qualifyMethod(BuiltInMethod.ADD_MONTHS.method)}($l, 
$op($r))"
 }
 
+  case (l: SqlTimeTypeInfo[_], r: SqlTimeTypeInfo[_]) if !plus =>
+generateOperatorIfNotNull(nullCheck, LONG_TYPE_INFO, left, right) {
+  (ll, rr) => typeName match {
+case SqlTypeName.INTERVAL_YEAR | SqlTypeName.INTERVAL_MONTH =>
+  (l, r) match {
+case (SqlTimeTypeInfo.TIMESTAMP, SqlTimeTypeInfo.DATE) =>
+  s"${qualifyMethod(BuiltInMethod.SUBTRACT_MONTHS.method)}" +
+s"($ll, $rr * ${MILLIS_PER_DAY}L)"
+case (SqlTimeTypeInfo.DATE, SqlTimeTypeInfo.TIMESTAMP) =>
+  s"${qualifyMethod(BuiltInMethod.SUBTRACT_MONTHS.method)}" +
+s"($ll * ${MILLIS_PER_DAY}L, $rr)"
+case _ =>
+  
s"${qualifyMethod(BuiltInMethod.SUBTRACT_MONTHS.method)}($ll, $rr)"
+  }
+
+case _ =>
+  (l, r) match {
+case (SqlTimeTypeInfo.TIMESTAMP, SqlTimeTypeInfo.TIMESTAMP) =>
+  s"$ll $op $rr"
 
 Review comment:
   When setting a breakpoint here, you can see that a `TimeIntervalTypeInfo` is 
requested but we return a `LongTypeInfo` here.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] twalthr commented on a change in pull request #6282: [FLINK-6847][FLINK-6813] [table] TimestampDiff table api and sql support

2018-08-07 Thread GitBox
twalthr commented on a change in pull request #6282: [FLINK-6847][FLINK-6813] 
[table] TimestampDiff table api and sql support
URL: https://github.com/apache/flink/pull/6282#discussion_r208151120
 
 

 ##
 File path: docs/dev/table/sql.md
 ##
 @@ -2247,6 +2247,17 @@ TIMESTAMPADD(unit, interval, timestamp)
   
 
 
+
+  
+{% highlight text %}
+TIMESTAMPDIFF(unit, timestamp1, timestamp2)
+{% endhighlight %}
+  
+  
+Returns the (signed) number of timeUnit intervals between 
timestamp1 and timestamp2. The unit for the interval is given by the unit 
argument, which should be one of the following values: SECOND, 
MINUTE, HOUR, DAY, WEEK, 
MONTH, QUARTER, or YEAR. E.g. 
TIMESTAMPDIFF(DAY, DATE '2003-01-02', DATE '2003-01-03') leads to 
1.
 
 Review comment:
   Could you create a table of time intervals units and time point units? We 
could move this to a subsection and reference it for all functions.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] twalthr commented on a change in pull request #6282: [FLINK-6847][FLINK-6813] [table] TimestampDiff table api and sql support

2018-08-07 Thread GitBox
twalthr commented on a change in pull request #6282: [FLINK-6847][FLINK-6813] 
[table] TimestampDiff table api and sql support
URL: https://github.com/apache/flink/pull/6282#discussion_r208169001
 
 

 ##
 File path: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala
 ##
 @@ -1943,6 +1943,147 @@ class ScalarFunctionsTest extends ScalarTypesTestBase {
   "false")
   }
 
+  @Test
+  def testTimestampDiff(): Unit = {
+testTableApi(
+  "2018-07-05 11:11:11".toTimestamp - "2018-07-03 11:11:11".toTimestamp,
+  "'2018-07-05 11:11:11'.toTimestamp - '2018-07-03 11:11:11'.toTimestamp",
+  "17280"
+)
+
+testTableApi(
+  "2018-07-05 11:11:11".toTimestamp - "2018-07-03".toDate,
+  "'2018-07-05 11:11:11'.toTimestamp - '2018-07-03'.toDate",
+  "213071000"
+)
+
+testTableApi(
+  "2018-07-03".toDate - "2018-07-05 11:11:11".toTimestamp,
+  "'2018-07-03'.toDate - '2018-07-05 11:11:11'.toTimestamp",
+  "-213071000"
+)
+
+testTableApi(
+  "2018-07-05".toDate - "2018-07-03".toDate,
+  "'2018-07-05'.toDate - '2018-07-03'.toDate",
+  "17280"
+)
+
+testAllApis(
+  timestampDiff(
+TimeIntervalUnit.DAY,
+"2018-07-03 11:11:11".toTimestamp,
+"2018-07-05 11:11:11".toTimestamp),
+  "timestampDiff(DAY, '2018-07-03 11:11:11'.toTimestamp," +
+"'2018-07-05 11:11:11'.toTimestamp)",
+  "TIMESTAMPDIFF(DAY, " +
+"TIMESTAMP '2018-07-03 11:11:11', TIMESTAMP '2018-07-05 11:11:11')",
+  "2")
+
+testAllApis(
+  timestampDiff(TimeIntervalUnit.DAY, "2016-06-15".toDate, "2016-06-16 
11:11:11".toTimestamp),
+  "timestampDiff(DAY, '2016-06-15'.toDate, '2016-06-16 
11:11:11'.toTimestamp)",
+  "TIMESTAMPDIFF(DAY, DATE '2016-06-15', TIMESTAMP '2016-06-16 11:11:11')",
+  "1")
+
+testAllApis(
+  timestampDiff(TimeIntervalUnit.DAY, "2016-06-15 11:00:00".toTimestamp, 
"2016-06-19".toDate),
+  "timestampDiff(DAY, '2016-06-15 11:00:00'.toTimestamp, 
'2016-06-19'.toDate)",
+  "TIMESTAMPDIFF(DAY, TIMESTAMP '2016-06-15 11:00:00', DATE '2016-06-19')",
+  "3")
+
+testAllApis(
+  timestampDiff(
+  TimeIntervalUnit.MONTH,
+  "2018-07-03 11:11:11".toTimestamp,
+  "2018-09-05 11:11:11".toTimestamp),
+  "timestampDiff(MONTH, " +
+"'2018-07-03 11:11:11'.toTimestamp, '2018-09-05 
11:11:11'.toTimestamp)",
+  "TIMESTAMPDIFF(MONTH, " +
+"TIMESTAMP '2018-07-03 11:11:11', TIMESTAMP '2018-09-05 11:11:11')",
+  "2")
+
+testAllApis(
+  timestampDiff(TimeIntervalUnit.MONTH, "2016-06-15".toDate, 
"2016-04-18".toDate),
+  "TIMESTAMPDIFF(MONTH, '2016-06-15'.toDate, '2016-04-18'.toDate)",
+  "TIMESTAMPDIFF(MONTH, DATE '2016-06-15', DATE '2016-04-18')",
+  "-1")
+
+testAllApis(
+  timestampDiff(TimeIntervalUnit.MONTH, "2016-06-15".toDate, 
"2016-04-14".toDate),
+  "timestampDiff(MONTH, '2016-06-15'.toDate, '2016-04-14'.toDate)",
+  "TIMESTAMPDIFF(MONTH, DATE '2016-06-15', DATE '2016-04-14')",
+  "-2")
+
+testAllApis(
+  timestampDiff(TimeIntervalUnit.YEAR, "2016-06-15".toDate, "2019-06-16 
11:11:11".toTimestamp),
+  "timestampDiff(YEAR, '2016-06-15'.toDate, '2019-06-16 
11:11:11'.toTimestamp)",
+  "TIMESTAMPDIFF(YEAR, DATE '2016-06-15', TIMESTAMP '2019-06-16 
11:11:11')",
+  "3")
+
+testAllApis(
+  timestampDiff(TimeIntervalUnit.YEAR, "2016-06-15 11:00:00".toTimestamp, 
"2013-06-19".toDate),
+  "timestampDiff(YEAR, '2016-06-15 11:00:00'.toTimestamp, 
'2013-06-19'.toDate)",
+  "TIMESTAMPDIFF(YEAR, TIMESTAMP '2016-06-15 11:00:00', DATE 
'2013-06-19')",
+  "-2")
+
+testAllApis(
+  timestampDiff(TimeIntervalUnit.DAY, "2016-06-15".toDate, 
"2016-06-18".toDate),
+  "timestampDiff(DAY, '2016-06-15'.toDate, '2016-06-18'.toDate)",
+  "TIMESTAMPDIFF(DAY, DATE '2016-06-15', DATE '2016-06-18')",
+  "3")
+
+testAllApis(
+  timestampDiff(
+TimeIntervalUnit.HOUR,
+"2018-07-03 11:11:11".toTimestamp,
+"2018-07-04 12:12:11".toTimestamp),
+  "timestampDiff(HOUR, '2018-07-03 11:11:11'.toTimestamp," +
+"'2018-07-04 12:12:11'.toTimestamp)",
+  "TIMESTAMPDIFF(HOUR, " +
+"TIMESTAMP '2018-07-03 11:11:11', TIMESTAMP '2018-07-04 12:12:11')",
+  "25")
+
+testAllApis(
+  timestampDiff(
+TimeIntervalUnit.HOUR,
+"2018-07-03 11:11:11".toTimestamp,
+"2018-07-04 12:10:11".toTimestamp),
+  "timestampDiff(HOUR, '2018-07-03 11:11:11'.toTimestamp," +
+"'2018-07-04 12:10:11'.toTimestamp)",
+  "TIMESTAMPDIFF(HOUR, " +
+"TIMESTAMP '2018-07-03 11:11:11', TIMESTAMP '2018-07-04 12:10:11')",
+  "24")
+
+testAllApis(
+  timestampDiff(
+TimeIntervalUnit.MINUTE,
+"2018-07-03 11:11:11".toTimestamp,
+"2018-07-03 12:10:11".toTimestamp),
+  "timestampDiff(MINUTE, 

[GitHub] twalthr commented on a change in pull request #6282: [FLINK-6847][FLINK-6813] [table] TimestampDiff table api and sql support

2018-08-07 Thread GitBox
twalthr commented on a change in pull request #6282: [FLINK-6847][FLINK-6813] 
[table] TimestampDiff table api and sql support
URL: https://github.com/apache/flink/pull/6282#discussion_r208151365
 
 

 ##
 File path: docs/dev/table/tableApi.md
 ##
 @@ -4544,6 +4544,16 @@ dateFormat(TIMESTAMP, STRING)
   
 
 
+
+  
+{% highlight scala %}
 
 Review comment:
   Please also update the Java documentation.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] twalthr commented on a change in pull request #6282: [FLINK-6847][FLINK-6813] [table] TimestampDiff table api and sql support

2018-08-07 Thread GitBox
twalthr commented on a change in pull request #6282: [FLINK-6847][FLINK-6813] 
[table] TimestampDiff table api and sql support
URL: https://github.com/apache/flink/pull/6282#discussion_r208167533
 
 

 ##
 File path: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala
 ##
 @@ -1943,6 +1943,147 @@ class ScalarFunctionsTest extends ScalarTypesTestBase {
   "false")
   }
 
+  @Test
+  def testTimestampDiff(): Unit = {
+testTableApi(
+  "2018-07-05 11:11:11".toTimestamp - "2018-07-03 11:11:11".toTimestamp,
+  "'2018-07-05 11:11:11'.toTimestamp - '2018-07-03 11:11:11'.toTimestamp",
+  "17280"
+)
+
+testTableApi(
+  "2018-07-05 11:11:11".toTimestamp - "2018-07-03".toDate,
+  "'2018-07-05 11:11:11'.toTimestamp - '2018-07-03'.toDate",
+  "213071000"
+)
+
+testTableApi(
+  "2018-07-03".toDate - "2018-07-05 11:11:11".toTimestamp,
+  "'2018-07-03'.toDate - '2018-07-05 11:11:11'.toTimestamp",
+  "-213071000"
+)
+
+testTableApi(
+  "2018-07-05".toDate - "2018-07-03".toDate,
+  "'2018-07-05'.toDate - '2018-07-03'.toDate",
+  "17280"
+)
+
+testAllApis(
+  timestampDiff(
+TimeIntervalUnit.DAY,
+"2018-07-03 11:11:11".toTimestamp,
+"2018-07-05 11:11:11".toTimestamp),
+  "timestampDiff(DAY, '2018-07-03 11:11:11'.toTimestamp," +
+"'2018-07-05 11:11:11'.toTimestamp)",
+  "TIMESTAMPDIFF(DAY, " +
+"TIMESTAMP '2018-07-03 11:11:11', TIMESTAMP '2018-07-05 11:11:11')",
+  "2")
+
+testAllApis(
+  timestampDiff(TimeIntervalUnit.DAY, "2016-06-15".toDate, "2016-06-16 
11:11:11".toTimestamp),
+  "timestampDiff(DAY, '2016-06-15'.toDate, '2016-06-16 
11:11:11'.toTimestamp)",
+  "TIMESTAMPDIFF(DAY, DATE '2016-06-15', TIMESTAMP '2016-06-16 11:11:11')",
+  "1")
+
+testAllApis(
+  timestampDiff(TimeIntervalUnit.DAY, "2016-06-15 11:00:00".toTimestamp, 
"2016-06-19".toDate),
+  "timestampDiff(DAY, '2016-06-15 11:00:00'.toTimestamp, 
'2016-06-19'.toDate)",
+  "TIMESTAMPDIFF(DAY, TIMESTAMP '2016-06-15 11:00:00', DATE '2016-06-19')",
+  "3")
+
+testAllApis(
+  timestampDiff(
+  TimeIntervalUnit.MONTH,
+  "2018-07-03 11:11:11".toTimestamp,
+  "2018-09-05 11:11:11".toTimestamp),
+  "timestampDiff(MONTH, " +
+"'2018-07-03 11:11:11'.toTimestamp, '2018-09-05 
11:11:11'.toTimestamp)",
+  "TIMESTAMPDIFF(MONTH, " +
+"TIMESTAMP '2018-07-03 11:11:11', TIMESTAMP '2018-09-05 11:11:11')",
+  "2")
+
+testAllApis(
+  timestampDiff(TimeIntervalUnit.MONTH, "2016-06-15".toDate, 
"2016-04-18".toDate),
+  "TIMESTAMPDIFF(MONTH, '2016-06-15'.toDate, '2016-04-18'.toDate)",
+  "TIMESTAMPDIFF(MONTH, DATE '2016-06-15', DATE '2016-04-18')",
+  "-1")
+
+testAllApis(
+  timestampDiff(TimeIntervalUnit.MONTH, "2016-06-15".toDate, 
"2016-04-14".toDate),
 
 Review comment:
   We should test the full matrix. MONTH on date, timestamp is missing.
   
   Maybe we can make the tests more generic similar to timestamp add. We should 
test every combination of unit, type, type.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] twalthr commented on a change in pull request #6282: [FLINK-6847][FLINK-6813] [table] TimestampDiff table api and sql support

2018-08-07 Thread GitBox
twalthr commented on a change in pull request #6282: [FLINK-6847][FLINK-6813] 
[table] TimestampDiff table api and sql support
URL: https://github.com/apache/flink/pull/6282#discussion_r208163587
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/time.scala
 ##
 @@ -340,3 +343,66 @@ case class DateFormat(timestamp: Expression, format: 
Expression) extends Express
 
   override private[flink] def resultType = STRING_TYPE_INFO
 }
+
+case class TimestampDiff(
+timeIntervalUnit: Expression,
+timestamp1: Expression,
+timestamp2: Expression)
+  extends Expression {
+
+  override private[flink] def children: Seq[Expression] =
+timeIntervalUnit :: timestamp1 :: timestamp2 :: Nil
+
+  override private[flink] def validateInput(): ValidationResult = {
+if (!TypeCheckUtils.isTimePoint(timestamp1.resultType)) {
+  return ValidationFailure(s"TimestampDiff operator requires Temporal 
input, " +
+s"but timestamp1 is of type ${timestamp1.resultType}")
+}
+
+if (!TypeCheckUtils.isTimePoint(timestamp2.resultType)) {
+  return ValidationFailure(s"TimestampDiff operator requires Temporal 
input, " +
+s"but timestamp2 is of type ${timestamp2.resultType}")
+}
+
+timeIntervalUnit match {
+  case SymbolExpression(TimeIntervalUnit.YEAR)
+   | SymbolExpression(TimeIntervalUnit.MONTH)
+   | SymbolExpression(TimeIntervalUnit.DAY)
+   | SymbolExpression(TimeIntervalUnit.HOUR)
+   | SymbolExpression(TimeIntervalUnit.MINUTE)
+   | SymbolExpression(TimeIntervalUnit.SECOND)
+if timestamp1.resultType == SqlTimeTypeInfo.DATE
+  || timestamp1.resultType == SqlTimeTypeInfo.TIMESTAMP
+  || timestamp2.resultType == SqlTimeTypeInfo.DATE
+  || timestamp2.resultType == SqlTimeTypeInfo.TIMESTAMP =>
+ValidationSuccess
+
+  case _ =>
+ValidationFailure(s"TimestampDiff operator does not support unit 
'$timeIntervalUnit'" +
+s" for input of type ('${timestamp1.resultType}', 
'${timestamp2.resultType}').")
+}
+  }
+  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): 
RexNode = {
+val typeFactory = relBuilder
+  .asInstanceOf[FlinkRelBuilder]
+  .getTypeFactory
+
+val intervalUnit = timeIntervalUnit.asInstanceOf[SymbolExpression].symbol
+  .enum.asInstanceOf[TimeUnitRange]
+val intervalType = typeFactory.createSqlIntervalType(
+  new SqlIntervalQualifier(intervalUnit.startUnit, intervalUnit.endUnit, 
SqlParserPos.ZERO))
+
+val rexCall = relBuilder
+  .getRexBuilder
+  .makeCall(intervalType, SqlStdOperatorTable.MINUS_DATE,
 
 Review comment:
   Why don't we call simply `SqlStdOperatorTable#TIMESTAMP_DIFF` here?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] twalthr commented on a change in pull request #6282: [FLINK-6847][FLINK-6813] [table] TimestampDiff table api and sql support

2018-08-07 Thread GitBox
twalthr commented on a change in pull request #6282: [FLINK-6847][FLINK-6813] 
[table] TimestampDiff table api and sql support
URL: https://github.com/apache/flink/pull/6282#discussion_r208164868
 
 

 ##
 File path: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala
 ##
 @@ -1943,6 +1943,147 @@ class ScalarFunctionsTest extends ScalarTypesTestBase {
   "false")
   }
 
+  @Test
+  def testTimestampDiff(): Unit = {
+testTableApi(
+  "2018-07-05 11:11:11".toTimestamp - "2018-07-03 11:11:11".toTimestamp,
+  "'2018-07-05 11:11:11'.toTimestamp - '2018-07-03 11:11:11'.toTimestamp",
+  "17280"
+)
+
+testTableApi(
+  "2018-07-05 11:11:11".toTimestamp - "2018-07-03".toDate,
 
 Review comment:
   Do we really want to support subtracting a date from a timestamp? Is the 
result useful? Shouldn't the result be another timestamp?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] twalthr commented on a change in pull request #6282: [FLINK-6847][FLINK-6813] [table] TimestampDiff table api and sql support

2018-08-07 Thread GitBox
twalthr commented on a change in pull request #6282: [FLINK-6847][FLINK-6813] 
[table] TimestampDiff table api and sql support
URL: https://github.com/apache/flink/pull/6282#discussion_r208151812
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
 ##
 @@ -769,9 +769,10 @@ abstract class CodeGenerator(
   case PLUS | DATETIME_PLUS if isTemporal(resultType) =>
 val left = operands.head
 val right = operands(1)
+val typeName = call.getType().getSqlTypeName()
 
 Review comment:
   Remove this line and use `resultType` instead? This would also simplify the 
patterns in `generateTemporalPlusMinus`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] twalthr commented on a change in pull request #6282: [FLINK-6847][FLINK-6813] [table] TimestampDiff table api and sql support

2018-08-07 Thread GitBox
twalthr commented on a change in pull request #6282: [FLINK-6847][FLINK-6813] 
[table] TimestampDiff table api and sql support
URL: https://github.com/apache/flink/pull/6282#discussion_r208161286
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarOperators.scala
 ##
 @@ -861,6 +865,35 @@ object ScalarOperators {
   (l, r) => s"${qualifyMethod(BuiltInMethod.ADD_MONTHS.method)}($l, 
$op($r))"
 }
 
+  case (l: SqlTimeTypeInfo[_], r: SqlTimeTypeInfo[_]) if !plus =>
+generateOperatorIfNotNull(nullCheck, LONG_TYPE_INFO, left, right) {
 
 Review comment:
   Is always returning `LONG_TYPE_INFO` in sync with MySQL and Calcite? We need 
to make sure that the type information is in sync with Calcite's Sql type. 
Otherwise we get a type mismatch in a following operation.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] twalthr commented on a change in pull request #6282: [FLINK-6847][FLINK-6813] [table] TimestampDiff table api and sql support

2018-08-07 Thread GitBox
twalthr commented on a change in pull request #6282: [FLINK-6847][FLINK-6813] 
[table] TimestampDiff table api and sql support
URL: https://github.com/apache/flink/pull/6282#discussion_r208150447
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala
 ##
 @@ -1095,6 +1095,34 @@ object dateFormat {
   }
 }
 
+/**
+ * Returns the (signed) number of timeUnit intervals between timestamp1 and 
timestamp2.
+ *
+ * For example timestampDiff(TimeIntervalUnit.DAY, "2016-06-15".toDate,
 
 Review comment:
   Scaladocs use `{{{  }}}` or backticks.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services