Repository: flink
Updated Branches:
  refs/heads/master 3507d59f9 -> e6fbda906


[FLINK-3580] [table] Add OVERLAPS function

This closes #2468.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e6fbda90
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e6fbda90
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e6fbda90

Branch: refs/heads/master
Commit: e6fbda906a173660df306e78eee010ed3fc59d8e
Parents: 3507d59
Author: twalthr <twal...@apache.org>
Authored: Sat Sep 3 08:00:58 2016 +0200
Committer: twalthr <twal...@apache.org>
Committed: Thu Sep 22 14:02:30 2016 +0200

----------------------------------------------------------------------
 docs/dev/table_api.md                           |  22 +++
 .../flink/api/scala/table/expressionDsl.scala   |  28 ++++
 .../flink/api/table/expressions/time.scala      |  98 ++++++++++++
 .../api/table/validate/FunctionCatalog.scala    |   3 +-
 .../table/expressions/ScalarFunctionsTest.scala | 150 ++++++++++++-------
 5 files changed, 249 insertions(+), 52 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/e6fbda90/docs/dev/table_api.md
----------------------------------------------------------------------
diff --git a/docs/dev/table_api.md b/docs/dev/table_api.md
index b88a7da..72b88a6 100644
--- a/docs/dev/table_api.md
+++ b/docs/dev/table_api.md
@@ -1623,6 +1623,17 @@ localTimestamp()
       </td>
     </tr>
 
+    <tr>
+      <td>
+        {% highlight java %}
+temporalOverlaps(TIMEPOINT, TEMPORAL, TIMEPOINT, TEMPORAL)
+{% endhighlight %}
+      </td>
+      <td>
+        <p>Determines whether two anchored time intervals overlap. Time point 
and temporal are transformed into a range defined by two time points (start, 
end). The function evaluates <code>leftEnd >= rightStart && rightEnd >= 
leftStart</code>. E.g. <code>temporalOverlaps("2:55:00".toTime, 1.hour, 
"3:30:00".toTime, 2.hour)</code> leads to true.</p>
+      </td>
+    </tr>
+
   </tbody>
 </table>
 
@@ -2030,6 +2041,17 @@ localTimestamp()
       </td>
     </tr>
 
+    <tr>
+      <td>
+        {% highlight scala %}
+temporalOverlaps(TIMEPOINT, TEMPORAL, TIMEPOINT, TEMPORAL)
+{% endhighlight %}
+      </td>
+      <td>
+        <p>Determines whether two anchored time intervals overlap. Time point 
and temporal are transformed into a range defined by two time points (start, 
end). The function evaluates <code>leftEnd >= rightStart && rightEnd >= 
leftStart</code>. E.g. <code>temporalOverlaps('2:55:00'.toTime, 1.hour, 
'3:30:00'.toTime, 2.hour)</code> leads to true.</p>
+      </td>
+    </tr>
+
   </tbody>
 </table>
 </div>

http://git-wip-us.apache.org/repos/asf/flink/blob/e6fbda90/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/expressionDsl.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/expressionDsl.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/expressionDsl.scala
index 003b8b2..9c2721b 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/expressionDsl.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/expressionDsl.scala
@@ -508,5 +508,33 @@ object localTimestamp {
   }
 }
 
+/**
+  * Determines whether two anchored time intervals overlap. Time point and 
temporal are
+  * transformed into a range defined by two time points (start, end). The 
function
+  * evaluates <code>leftEnd >= rightStart && rightEnd >= leftStart</code>.
+  *
+  * It evaluates: leftEnd >= rightStart && rightEnd >= leftStart
+  *
+  * e.g. temporalOverlaps("2:55:00".toTime, 1.hour, "3:30:00".toTime, 2.hour) 
leads to true
+  */
+object temporalOverlaps {
+
+  /**
+    * Determines whether two anchored time intervals overlap. Time point and 
temporal are
+    * transformed into a range defined by two time points (start, end).
+    *
+    * It evaluates: leftEnd >= rightStart && rightEnd >= leftStart
+    *
+    * e.g. temporalOverlaps("2:55:00".toTime, 1.hour, "3:30:00".toTime, 
2.hour) leads to true
+    */
+  def apply(
+      leftTimePoint: Expression,
+      leftTemporal: Expression,
+      rightTimePoint: Expression,
+      rightTemporal: Expression): Expression = {
+    TemporalOverlaps(leftTimePoint, leftTemporal, rightTimePoint, 
rightTemporal)
+  }
+}
+
 
 

http://git-wip-us.apache.org/repos/asf/flink/blob/e6fbda90/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/time.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/time.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/time.scala
index 4b1942e..1f6361e 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/time.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/time.scala
@@ -28,6 +28,7 @@ import org.apache.flink.api.common.typeinfo.{SqlTimeTypeInfo, 
TypeInformation}
 import org.apache.flink.api.table.FlinkRelBuilder
 import org.apache.flink.api.table.expressions.ExpressionUtils.{divide, 
getFactor, mod}
 import org.apache.flink.api.table.expressions.TimeIntervalUnit.TimeIntervalUnit
+import org.apache.flink.api.table.typeutils.TypeCheckUtils.isTimeInterval
 import org.apache.flink.api.table.typeutils.{IntervalTypeInfo, TypeCheckUtils}
 import org.apache.flink.api.table.validate.{ExprValidationResult, 
ValidationFailure, ValidationSuccess}
 
@@ -277,3 +278,100 @@ case class Quarter(child: Expression) extends 
UnaryExpression with InputTypeSpec
   }
 }
 
+/**
+  * Determines whether two anchored time intervals overlap.
+  */
+case class TemporalOverlaps(
+    leftTimePoint: Expression,
+    leftTemporal: Expression,
+    rightTimePoint: Expression,
+    rightTemporal: Expression)
+  extends Expression {
+
+  override private[flink] def children: Seq[Expression] =
+    Seq(leftTimePoint, leftTemporal, rightTimePoint, rightTemporal)
+
+  override private[flink] def resultType: TypeInformation[_] = 
BOOLEAN_TYPE_INFO
+
+  override private[flink] def validateInput(): ExprValidationResult = {
+    if (!TypeCheckUtils.isTimePoint(leftTimePoint.resultType)) {
+      return ValidationFailure(s"TemporalOverlaps operator requires 
leftTimePoint to be of type " +
+        s"Time Point, but get ${leftTimePoint.resultType}.")
+    }
+    if (!TypeCheckUtils.isTimePoint(rightTimePoint.resultType)) {
+      return ValidationFailure(s"TemporalOverlaps operator requires 
rightTimePoint to be of " +
+        s"type Time Point, but get ${rightTimePoint.resultType}.")
+    }
+    if (leftTimePoint.resultType != rightTimePoint.resultType) {
+      return ValidationFailure(s"TemporalOverlaps operator requires 
leftTimePoint and " +
+        s"rightTimePoint to be of same type.")
+    }
+
+    // leftTemporal is point, then it must be comparable with leftTimePoint
+    if (TypeCheckUtils.isTimePoint(leftTemporal.resultType)) {
+      if (leftTemporal.resultType != leftTimePoint.resultType) {
+        return ValidationFailure(s"TemporalOverlaps operator requires 
leftTemporal and " +
+          s"leftTimePoint to be of same type if leftTemporal is of type Time 
Point.")
+      }
+    } else if (!isTimeInterval(leftTemporal.resultType)) {
+      return ValidationFailure(s"TemporalOverlaps operator requires 
leftTemporal to be of " +
+        s"type Time Point or Time Interval.")
+    }
+
+    // rightTemporal is point, then it must be comparable with rightTimePoint
+    if (TypeCheckUtils.isTimePoint(rightTemporal.resultType)) {
+      if (rightTemporal.resultType != rightTimePoint.resultType) {
+        return ValidationFailure(s"TemporalOverlaps operator requires 
rightTemporal and " +
+          s"rightTimePoint to be of same type if rightTemporal is of type Time 
Point.")
+      }
+    } else if (!isTimeInterval(rightTemporal.resultType)) {
+      return ValidationFailure(s"TemporalOverlaps operator requires 
rightTemporal to be of " +
+        s"type Time Point or Time Interval.")
+    }
+    ValidationSuccess
+  }
+
+  override def toString: String = s"temporalOverlaps(${children.mkString(", 
")})"
+
+  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): 
RexNode = {
+    convertOverlaps(
+      leftTimePoint.toRexNode,
+      leftTemporal.toRexNode,
+      rightTimePoint.toRexNode,
+      rightTemporal.toRexNode,
+      relBuilder.asInstanceOf[FlinkRelBuilder])
+  }
+
+  /**
+    * Standard conversion of the OVERLAPS operator.
+    * Source: 
[[org.apache.calcite.sql2rel.StandardConvertletTable#convertOverlaps()]]
+    */
+  private def convertOverlaps(
+      leftP: RexNode,
+      leftT: RexNode,
+      rightP: RexNode,
+      rightT: RexNode,
+      relBuilder: FlinkRelBuilder)
+    : RexNode = {
+    // leftT = leftP + leftT if leftT is an interval
+    val convLeftT = if (isTimeInterval(leftTemporal.resultType)) {
+        relBuilder.call(SqlStdOperatorTable.PLUS, leftP, leftT)
+      } else {
+        leftT
+      }
+    // rightT = rightP + rightT if rightT is an interval
+    val convRightT = if (isTimeInterval(rightTemporal.resultType)) {
+        relBuilder.call(SqlStdOperatorTable.PLUS, rightP, rightT)
+      } else {
+        rightT
+      }
+    // leftT >= rightP
+    val leftPred = relBuilder.call(SqlStdOperatorTable.GREATER_THAN_OR_EQUAL, 
convLeftT, rightP)
+    // rightT >= leftP
+    val rightPred = relBuilder.call(SqlStdOperatorTable.GREATER_THAN_OR_EQUAL, 
convRightT, leftP)
+
+    // leftT >= rightP and rightT >= leftP
+    relBuilder.call(SqlStdOperatorTable.AND, leftPred, rightPred)
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/flink/blob/e6fbda90/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/validate/FunctionCatalog.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/validate/FunctionCatalog.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/validate/FunctionCatalog.scala
index e8a2971..42d460e 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/validate/FunctionCatalog.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/validate/FunctionCatalog.scala
@@ -160,7 +160,8 @@ object FunctionCatalog {
     "currentTimestamp" -> classOf[CurrentTimestamp],
     "localTime" -> classOf[LocalTime],
     "localTimestamp" -> classOf[LocalTimestamp],
-    "quarter" -> classOf[Quarter]
+    "quarter" -> classOf[Quarter],
+    "temporalOverlaps" -> classOf[TemporalOverlaps]
 
     // TODO implement function overloading here
     // "floor" -> classOf[TemporalFloor]

http://git-wip-us.apache.org/repos/asf/flink/blob/e6fbda90/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/ScalarFunctionsTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/ScalarFunctionsTest.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/ScalarFunctionsTest.scala
index 672b876..5506793 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/ScalarFunctionsTest.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/ScalarFunctionsTest.scala
@@ -230,6 +230,10 @@ class ScalarFunctionsTest extends ExpressionTestBase {
       "false")
   }
 
+  // 
----------------------------------------------------------------------------------------------
+  // Math functions
+  // 
----------------------------------------------------------------------------------------------
+
   @Test
   def testMod(): Unit = {
     testAllApis(
@@ -513,6 +517,10 @@ class ScalarFunctionsTest extends ExpressionTestBase {
       "-1231")
   }
 
+  // 
----------------------------------------------------------------------------------------------
+  // Temporal functions
+  // 
----------------------------------------------------------------------------------------------
+
   @Test
   def testExtract(): Unit = {
     testAllApis(
@@ -748,57 +756,6 @@ class ScalarFunctionsTest extends ExpressionTestBase {
   }
 
   @Test
-  def testIsTrueIsFalse(): Unit = {
-    testAllApis(
-      'f1.isTrue,
-      "f1.isTrue",
-      "f1 IS TRUE",
-      "true")
-
-    testAllApis(
-      'f21.isTrue,
-      "f21.isTrue",
-      "f21 IS TRUE",
-      "false")
-
-    testAllApis(
-      false.isFalse,
-      "false.isFalse",
-      "FALSE IS FALSE",
-      "true")
-
-    testAllApis(
-      'f21.isFalse,
-      "f21.isFalse",
-      "f21 IS FALSE",
-      "false")
-
-    testAllApis(
-      !'f1.isTrue,
-      "!f1.isTrue",
-      "f1 IS NOT TRUE",
-      "false")
-
-    testAllApis(
-      !'f21.isTrue,
-      "!f21.isTrue",
-      "f21 IS NOT TRUE",
-      "true")
-
-    testAllApis(
-      !false.isFalse,
-      "!false.isFalse",
-      "FALSE IS NOT FALSE",
-      "false")
-
-    testAllApis(
-      !'f21.isFalse,
-      "!f21.isFalse",
-      "f21 IS NOT FALSE",
-      "true")
-  }
-
-  @Test
   def testCurrentTimePoint(): Unit = {
 
     // current time points are non-deterministic
@@ -844,6 +801,42 @@ class ScalarFunctionsTest extends ExpressionTestBase {
   }
 
   @Test
+  def testOverlaps(): Unit = {
+    testAllApis(
+      temporalOverlaps("2:55:00".toTime, 1.hour, "3:30:00".toTime, 2.hour),
+      "temporalOverlaps('2:55:00'.toTime, 1.hour, '3:30:00'.toTime, 2.hour)",
+      "(TIME '2:55:00', INTERVAL '1' HOUR) OVERLAPS (TIME '3:30:00', INTERVAL 
'2' HOUR)",
+      "true")
+
+    testAllApis(
+      temporalOverlaps("9:00:00".toTime, "9:30:00".toTime, "9:29:00".toTime, 
"9:31:00".toTime),
+      "temporalOverlaps('9:00:00'.toTime, '9:30:00'.toTime, '9:29:00'.toTime, 
'9:31:00'.toTime)",
+      "(TIME '9:00:00', TIME '9:30:00') OVERLAPS (TIME '9:29:00', TIME 
'9:31:00')",
+      "true")
+
+    testAllApis(
+      temporalOverlaps("9:00:00".toTime, "10:00:00".toTime, "10:15:00".toTime, 
3.hour),
+      "temporalOverlaps('9:00:00'.toTime, '10:00:00'.toTime, 
'10:15:00'.toTime, 3.hour)",
+      "(TIME '9:00:00', TIME '10:00:00') OVERLAPS (TIME '10:15:00', INTERVAL 
'3' HOUR)",
+      "false")
+
+    testAllApis(
+      temporalOverlaps("2011-03-10".toDate, 10.day, "2011-03-19".toDate, 
10.day),
+      "temporalOverlaps('2011-03-10'.toDate, 10.day, '2011-03-19'.toDate, 
10.day)",
+      "(DATE '2011-03-10', INTERVAL '10' DAY) OVERLAPS (DATE '2011-03-19', 
INTERVAL '10' DAY)",
+      "true")
+
+    testAllApis(
+      temporalOverlaps("2011-03-10 02:02:02.001".toTimestamp, 0.milli,
+        "2011-03-10 02:02:02.002".toTimestamp, "2011-03-10 
02:02:02.002".toTimestamp),
+      "temporalOverlaps('2011-03-10 02:02:02.001'.toTimestamp, 0.milli, " +
+        "'2011-03-10 02:02:02.002'.toTimestamp, '2011-03-10 
02:02:02.002'.toTimestamp)",
+      "(TIMESTAMP '2011-03-10 02:02:02.001', INTERVAL '0' SECOND) OVERLAPS " +
+        "(TIMESTAMP '2011-03-10 02:02:02.002', TIMESTAMP '2011-03-10 
02:02:02.002')",
+      "false")
+  }
+
+  @Test
   def testQuarter(): Unit = {
     testAllApis(
       "1997-01-27".toDate.quarter(),
@@ -865,6 +858,61 @@ class ScalarFunctionsTest extends ExpressionTestBase {
   }
 
   // 
----------------------------------------------------------------------------------------------
+  // Other functions
+  // 
----------------------------------------------------------------------------------------------
+
+  @Test
+  def testIsTrueIsFalse(): Unit = {
+    testAllApis(
+      'f1.isTrue,
+      "f1.isTrue",
+      "f1 IS TRUE",
+      "true")
+
+    testAllApis(
+      'f21.isTrue,
+      "f21.isTrue",
+      "f21 IS TRUE",
+      "false")
+
+    testAllApis(
+      false.isFalse,
+      "false.isFalse",
+      "FALSE IS FALSE",
+      "true")
+
+    testAllApis(
+      'f21.isFalse,
+      "f21.isFalse",
+      "f21 IS FALSE",
+      "false")
+
+    testAllApis(
+      !'f1.isTrue,
+      "!f1.isTrue",
+      "f1 IS NOT TRUE",
+      "false")
+
+    testAllApis(
+      !'f21.isTrue,
+      "!f21.isTrue",
+      "f21 IS NOT TRUE",
+      "true")
+
+    testAllApis(
+      !false.isFalse,
+      "!false.isFalse",
+      "FALSE IS NOT FALSE",
+      "false")
+
+    testAllApis(
+      !'f21.isFalse,
+      "!f21.isFalse",
+      "f21 IS NOT FALSE",
+      "true")
+  }
+
+  // 
----------------------------------------------------------------------------------------------
 
   def testData = {
     val testData = new Row(22)

Reply via email to