[GitHub] flink pull request #5140: [FLINK-7797] [table] Add support for windowed oute...

2018-01-09 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5140


---


[GitHub] flink pull request #5140: [FLINK-7797] [table] Add support for windowed oute...

2018-01-04 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5140#discussion_r159695882
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TimeBoundedStreamJoin.scala
 ---
@@ -196,35 +181,38 @@ abstract class TimeBoundedStreamJoin(
 if (rightTime >= rightQualifiedLowerBound && rightTime <= 
rightQualifiedUpperBound) {
   val rightRows = rightEntry.getValue
   var i = 0
-  var markEmitted = false
+  var entryUpdated = false
   while (i < rightRows.size) {
-joinCollector.resetThisTurn()
+joinCollector.reset()
 val tuple = rightRows.get(i)
 joinFunction.join(leftRow, tuple.f0, joinCollector)
-if (joinType == JoinType.RIGHT_OUTER || joinType == 
JoinType.FULL_OUTER) {
-  if (!tuple.f1 && joinCollector.everEmittedThisTurn) {
-// Mark the right row as being successfully joined and 
emitted.
-tuple.f1 = true
-markEmitted = true
+if (joinCollector.emitted) {
--- End diff --

change to

```
emitted = emitted || joinCollector.emitted
if (joinType == JoinType.RIGHT_OUTER || joinType == JoinType.FULL_OUTER) {
  if (!tuple.f1 && joinCollector.emitted) {
// Mark the right row as being successfully joined and emitted.
tuple.f1 = true
entryUpdated = true
  }
}
```

to avoid a condition for inner and left joins
  


---


[GitHub] flink pull request #5140: [FLINK-7797] [table] Add support for windowed oute...

2018-01-04 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5140#discussion_r159719430
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamWindowJoin.scala
 ---
@@ -183,23 +190,48 @@ class DataStreamWindowJoin(
 }
   }
 
-  def createEmptyJoin(
+  def createNegativeWindowSizeJoin(
--- End diff --

I think we can make this even more efficient if we implement this as:

```
def createNegativeWindowSizeJoin(
joinType: JoinType,
leftInput: DataStream[CRow],
rightInput: DataStream[CRow],
leftArity: Int,
rightArity: Int,
returnType: TypeInformation[CRow]): DataStream[CRow] = {

  // we filter all records instead of adding an empty source to preserve 
the watermarks
  val allFilter = new FlatMapFunction[CRow, CRow] with 
ResultTypeQueryable[CRow] {
override def flatMap(value: CRow, out: Collector[CRow]): Unit = { }
override def getProducedType: TypeInformation[CRow] = returnType
  }

  val leftPadder = new MapFunction[CRow, CRow] with 
ResultTypeQueryable[CRow] {
val paddingUtil = new OuterJoinPaddingUtil(leftArity, rightArity)
override def map(value: CRow): CRow = new 
CRow(paddingUtil.padLeft(value.row), true)
override def getProducedType: TypeInformation[CRow] = returnType
  }

  val rightPadder = new MapFunction[CRow, CRow] with 
ResultTypeQueryable[CRow] {
val paddingUtil = new OuterJoinPaddingUtil(leftArity, rightArity)
override def map(value: CRow): CRow = new 
CRow(paddingUtil.padRight(value.row), true)
override def getProducedType: TypeInformation[CRow] = returnType
  }

  val leftP = leftInput.getParallelism
  val rightP = rightInput.getParallelism

  joinType match {
case JoinType.INNER =>
  leftInput.flatMap(allFilter).name("Empty Inner 
Join").setParallelism(leftP)
.union(rightInput.flatMap(allFilter).name("Empty Inner 
Join").setParallelism(rightP))
case JoinType.LEFT_OUTER =>
  leftInput.map(leftPadder).name("Left Outer 
Join").setParallelism(leftP)
.union(rightInput.flatMap(allFilter).name("Left Outer 
Join").setParallelism(rightP))
case JoinType.RIGHT_OUTER =>
  leftInput.flatMap(allFilter).name("Right Outer 
Join").setParallelism(leftP)
.union(rightInput.map(rightPadder).name("Right Outer 
Join").setParallelism(rightP))
case JoinType.FULL_OUTER =>
  leftInput.map(leftPadder).name("Full Outer 
Join").setParallelism(leftP)
.union(rightInput.map(rightPadder).name("Full Outer 
Join").setParallelism(rightP))
  }
}
```

We also need to make `OuterJoinPaddingUtil` extend `java.io.Serializable` 
for this.



---


[GitHub] flink pull request #5140: [FLINK-7797] [table] Add support for windowed oute...

2018-01-04 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5140#discussion_r159697707
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TimeBoundedStreamJoin.scala
 ---
@@ -437,15 +427,14 @@ abstract class TimeBoundedStreamJoin(
 * Remove the expired rows. Register a new timer if the cache still 
holds valid rows
 * after the cleaning up.
 *
-* @param collector  the collector to emit results
--- End diff --

Don't remove parameter documentation for `collector`


---


[GitHub] flink pull request #5140: [FLINK-7797] [table] Add support for windowed oute...

2018-01-04 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5140#discussion_r159700432
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TimeBoundedStreamJoin.scala
 ---
@@ -344,23 +434,42 @@ abstract class TimeBoundedStreamInnerJoin(
 * @param removeLeft whether to remove the left rows
 */
   private def removeExpiredRows(
+  collector: EmitAwareCollector,
   expirationTime: Long,
-  rowCache: MapState[Long, JList[Row]],
+  rowCache: MapState[Long, JList[JTuple2[Row, Boolean]]],
   timerState: ValueState[Long],
   ctx: CoProcessFunction[CRow, CRow, CRow]#OnTimerContext,
   removeLeft: Boolean): Unit = {
 
-val keysIterator = rowCache.keys().iterator()
+val iterator = rowCache.iterator()
 
 var earliestTimestamp: Long = -1L
-var rowTime: Long = 0L
 
 // We remove all expired keys and do not leave the loop early.
 // Hence, we do a full pass over the state.
-while (keysIterator.hasNext) {
-  rowTime = keysIterator.next
+while (iterator.hasNext) {
+  val entry = iterator.next
+  val rowTime = entry.getKey
   if (rowTime <= expirationTime) {
-keysIterator.remove()
+if ((joinType == JoinType.RIGHT_OUTER && !removeLeft) ||
--- End diff --

Refactor to 

```
if (removeLeft &&
  (joinType == JoinType.LEFT_OUTER || joinType == JoinType.FULL_OUTER)) {
  val rows = entry.getValue
  var i = 0
  while (i < rows.size) {
val tuple = rows.get(i)
if (!tuple.f1) {
  // Emit a null padding result if the row has never been successfully 
joined.
  collector.collect(paddingUtil.padLeft(tuple.f0))
}
i += 1
  }
} else if (!removeLeft &&
  (joinType == JoinType.RIGHT_OUTER || joinType == JoinType.FULL_OUTER)) {
  val rows = entry.getValue
  var i = 0
  while (i < rows.size) {
val tuple = rows.get(i)
if (!tuple.f1) {
  // Emit a null padding result if the row has never been successfully 
joined.
  collector.collect(paddingUtil.padRight(tuple.f0))
}
i += 1
  }
}
iterator.remove()
```

to reduce the number of conditions.
  


---


[GitHub] flink pull request #5140: [FLINK-7797] [table] Add support for windowed oute...

2018-01-04 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5140#discussion_r159698104
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TimeBoundedStreamJoin.scala
 ---
@@ -437,15 +427,14 @@ abstract class TimeBoundedStreamJoin(
 * Remove the expired rows. Register a new timer if the cache still 
holds valid rows
 * after the cleaning up.
 *
-* @param collector  the collector to emit results
 * @param expirationTime the expiration time for this cache
 * @param rowCache   the row cache
 * @param timerState timer state for the opposite stream
 * @param ctxthe context to register the cleanup timer
 * @param removeLeft whether to remove the left rows
 */
   private def removeExpiredRows(
-  collector: Collector[Row],
--- End diff --

Why did you change the type? 
`EmitAwareCollector` is a `Collector[Row]`.


---


[GitHub] flink pull request #5140: [FLINK-7797] [table] Add support for windowed oute...

2018-01-04 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5140#discussion_r159697015
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TimeBoundedStreamJoin.scala
 ---
@@ -288,34 +276,37 @@ abstract class TimeBoundedStreamJoin(
 if (leftTime >= leftQualifiedLowerBound && leftTime <= 
leftQualifiedUpperBound) {
   val leftRows = leftEntry.getValue
   var i = 0
-  var markEmitted = false
+  var entryUpdated = false
   while (i < leftRows.size) {
-joinCollector.resetThisTurn()
+joinCollector.reset()
 val tuple = leftRows.get(i)
 joinFunction.join(tuple.f0, rightRow, joinCollector)
-if (joinType == JoinType.LEFT_OUTER || joinType == 
JoinType.FULL_OUTER) {
-  if (!tuple.f1 && joinCollector.everEmittedThisTurn) {
-// Mark the left row as being successfully joined and 
emitted.
-tuple.f1 = true
-markEmitted = true
+if (joinCollector.emitted) {
--- End diff --

same as for `processElement1()`


---


[GitHub] flink pull request #5140: [FLINK-7797] [table] Add support for windowed oute...

2017-12-29 Thread xccui
Github user xccui commented on a diff in the pull request:

https://github.com/apache/flink/pull/5140#discussion_r159069304
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TimeBoundedStreamJoin.scala
 ---
@@ -182,16 +196,64 @@ abstract class TimeBoundedStreamInnerJoin(
 if (rightTime >= rightQualifiedLowerBound && rightTime <= 
rightQualifiedUpperBound) {
   val rightRows = rightEntry.getValue
   var i = 0
+  var markEmitted = false
   while (i < rightRows.size) {
-joinFunction.join(leftRow, rightRows.get(i), cRowWrapper)
+joinCollector.resetThisTurn()
+val tuple = rightRows.get(i)
+joinFunction.join(leftRow, tuple.f0, joinCollector)
+if (joinType == JoinType.RIGHT_OUTER || joinType == 
JoinType.FULL_OUTER) {
+  if (!tuple.f1 && joinCollector.everEmittedThisTurn) {
+// Mark the right row as being successfully joined and 
emitted.
+tuple.f1 = true
+markEmitted = true
+  }
+}
 i += 1
   }
+  if (markEmitted) {
+// Write back the edited entry (mark emitted) for the right 
cache.
+rightEntry.setValue(rightRows)
+  }
 }
 
 if (rightTime <= rightExpirationTime) {
+  if (joinType == JoinType.LEFT_OUTER || joinType == 
JoinType.FULL_OUTER) {
--- End diff --

Yes, I should have added a harness test for that.


---


[GitHub] flink pull request #5140: [FLINK-7797] [table] Add support for windowed oute...

2017-12-29 Thread xccui
Github user xccui commented on a diff in the pull request:

https://github.com/apache/flink/pull/5140#discussion_r159069074
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/JoinAwareCollector.scala
 ---
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.join
+
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.types.Row
+import org.apache.flink.util.Collector
+
+/**
+  * Collector to track whether there's a joined result.
+  */
+class JoinAwareCollector extends Collector[Row]{
+
+  private var emitted = false
+  private var emittedThisTurn = false
+  private var innerCollector: Collector[CRow] = _
+  private val cRow: CRow = new CRow()
--- End diff --

I'll add a function to set this value in the Collector.


---


[GitHub] flink pull request #5140: [FLINK-7797] [table] Add support for windowed oute...

2017-12-28 Thread xccui
Github user xccui commented on a diff in the pull request:

https://github.com/apache/flink/pull/5140#discussion_r159023858
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamWindowJoin.scala
 ---
@@ -142,50 +143,47 @@ class DataStreamWindowJoin(
 s"${joinConditionToString(schema.relDataType, joinCondition, 
getExpressionString)}), " +
 s"join: (${joinSelectionToString(schema.relDataType)})"
 
-joinType match {
-  case JoinRelType.INNER =>
-if (relativeWindowSize < 0) {
-  LOG.warn(s"The relative window size $relativeWindowSize is 
negative," +
-" please check the join conditions.")
-  createEmptyInnerJoin(leftDataStream, rightDataStream, 
returnTypeInfo)
-} else {
-  if (isRowTime) {
-createRowTimeInnerJoin(
-  leftDataStream,
-  rightDataStream,
-  returnTypeInfo,
-  joinOpName,
-  joinFunction.name,
-  joinFunction.code,
-  leftKeys,
-  rightKeys
-)
-  } else {
-createProcTimeInnerJoin(
-  leftDataStream,
-  rightDataStream,
-  returnTypeInfo,
-  joinOpName,
-  joinFunction.name,
-  joinFunction.code,
-  leftKeys,
-  rightKeys
-)
-  }
-}
-  case JoinRelType.FULL =>
-throw new TableException(
-  "Full join between stream and stream is not supported yet.")
-  case JoinRelType.LEFT =>
-throw new TableException(
-  "Left join between stream and stream is not supported yet.")
-  case JoinRelType.RIGHT =>
-throw new TableException(
-  "Right join between stream and stream is not supported yet.")
+val flinkJoinType = joinType match {
+  case JoinRelType.INNER => JoinType.INNER
+  case JoinRelType.FULL => JoinType.FULL_OUTER
+  case JoinRelType.LEFT => JoinType.LEFT_OUTER
+  case JoinRelType.RIGHT => JoinType.RIGHT_OUTER
+}
+
+if (relativeWindowSize < 0) {
+  LOG.warn(s"The relative window size $relativeWindowSize is 
negative," +
+" please check the join conditions.")
+  createEmptyJoin(leftDataStream, rightDataStream, returnTypeInfo)
--- End diff --

Yes, your are right. I'll add this part.


---


[GitHub] flink pull request #5140: [FLINK-7797] [table] Add support for windowed oute...

2017-12-28 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5140#discussion_r159012674
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TimeBoundedStreamJoin.scala
 ---
@@ -268,13 +364,16 @@ abstract class TimeBoundedStreamInnerJoin(
   ctx: CoProcessFunction[CRow, CRow, CRow]#OnTimerContext,
   out: Collector[CRow]): Unit = {
 
+joinCollector.setCollector(out)
+joinCollector.reset()
--- End diff --

No need to reset


---


[GitHub] flink pull request #5140: [FLINK-7797] [table] Add support for windowed oute...

2017-12-28 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5140#discussion_r158738627
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/JoinAwareCollector.scala
 ---
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.join
+
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.types.Row
+import org.apache.flink.util.Collector
+
+/**
+  * Collector to track whether there's a joined result.
+  */
+class JoinAwareCollector extends Collector[Row]{
+
+  private var emitted = false
+  private var emittedThisTurn = false
+  private var innerCollector: Collector[CRow] = _
+  private val cRow: CRow = new CRow()
--- End diff --

explicitly set `change` value of`CRow`


---


[GitHub] flink pull request #5140: [FLINK-7797] [table] Add support for windowed oute...

2017-12-28 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5140#discussion_r159012976
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TimeBoundedStreamJoin.scala
 ---
@@ -382,6 +498,29 @@ abstract class TimeBoundedStreamInnerJoin(
 }
   }
 
+  /**
+* Return a padding result with the given left/right row.
+* @param row the row to pad
+* @param paddingLeft pad left or right
+* @return the null padding result
+*/
+  private def paddingResult(row: Row, paddingLeft: Boolean): Row = {
--- End diff --

Move this method into a util class and split it into two method (`padLeft` 
and `padRight`). The methods should be `final`.


---


[GitHub] flink pull request #5140: [FLINK-7797] [table] Add support for windowed oute...

2017-12-28 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5140#discussion_r159014190
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TimeBoundedStreamJoin.scala
 ---
@@ -133,6 +149,19 @@ abstract class TimeBoundedStreamInnerJoin(
 val rightTimerStateDesc: ValueStateDescriptor[Long] =
   new ValueStateDescriptor[Long]("InnerJoinRightTimerState", 
classOf[Long])
 rightTimerState = getRuntimeContext.getState(rightTimerStateDesc)
+
+
+// Initialize the two reusable padding results.
+var i = 0
+while (i < leftArity) {
--- End diff --

Move to util class


---


[GitHub] flink pull request #5140: [FLINK-7797] [table] Add support for windowed oute...

2017-12-28 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5140#discussion_r159011164
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TimeBoundedStreamJoin.scala
 ---
@@ -182,16 +196,64 @@ abstract class TimeBoundedStreamInnerJoin(
 if (rightTime >= rightQualifiedLowerBound && rightTime <= 
rightQualifiedUpperBound) {
   val rightRows = rightEntry.getValue
   var i = 0
+  var markEmitted = false
   while (i < rightRows.size) {
-joinFunction.join(leftRow, rightRows.get(i), cRowWrapper)
+joinCollector.resetThisTurn()
+val tuple = rightRows.get(i)
+joinFunction.join(leftRow, tuple.f0, joinCollector)
+if (joinType == JoinType.RIGHT_OUTER || joinType == 
JoinType.FULL_OUTER) {
+  if (!tuple.f1 && joinCollector.everEmittedThisTurn) {
+// Mark the right row as being successfully joined and 
emitted.
+tuple.f1 = true
+markEmitted = true
+  }
+}
 i += 1
   }
+  if (markEmitted) {
+// Write back the edited entry (mark emitted) for the right 
cache.
+rightEntry.setValue(rightRows)
+  }
 }
 
 if (rightTime <= rightExpirationTime) {
+  if (joinType == JoinType.LEFT_OUTER || joinType == 
JoinType.FULL_OUTER) {
--- End diff --

This should be `joinType == JoinType.RIGHT_OUTER || joinType == 
JoinType.FULL_OUTER` because we preserve the records of the right side.

This should be covered by a harness test.


---


[GitHub] flink pull request #5140: [FLINK-7797] [table] Add support for windowed oute...

2017-12-28 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5140#discussion_r159012517
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TimeBoundedStreamJoin.scala
 ---
@@ -241,17 +288,66 @@ abstract class TimeBoundedStreamInnerJoin(
 if (leftTime >= leftQualifiedLowerBound && leftTime <= 
leftQualifiedUpperBound) {
   val leftRows = leftEntry.getValue
   var i = 0
+  var markEmitted = false
   while (i < leftRows.size) {
-joinFunction.join(leftRows.get(i), rightRow, cRowWrapper)
+joinCollector.resetThisTurn()
+val tuple = leftRows.get(i)
+joinFunction.join(tuple.f0, rightRow, joinCollector)
+if (joinType == JoinType.LEFT_OUTER || joinType == 
JoinType.FULL_OUTER) {
+  if (!tuple.f1 && joinCollector.everEmittedThisTurn) {
+// Mark the left row as being successfully joined and 
emitted.
+tuple.f1 = true
+markEmitted = true
+  }
+}
 i += 1
   }
+  if (markEmitted) {
+// Write back the edited entry (mark emitted) for the right 
cache.
+leftEntry.setValue(leftRows)
+  }
 }
 if (leftTime <= leftExpirationTime) {
+  if (joinType == JoinType.RIGHT_OUTER || joinType == 
JoinType.FULL_OUTER) {
--- End diff --

`JoinType.RIGHT_OUTER` should be `JoinType.LEFT_OUTER`


---


[GitHub] flink pull request #5140: [FLINK-7797] [table] Add support for windowed oute...

2017-12-28 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5140#discussion_r159013964
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/JoinITCase.scala
 ---
@@ -461,6 +462,302 @@ class JoinITCase extends StreamingWithStateTestBase {
 StreamITCase.compareWithList(expected)
   }
 
+  // Tests for left outer join
+  @Test
+  def testProcTimeLeftOuterJoin(): Unit = {
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env)
+env.setStateBackend(getStateBackend)
+StreamITCase.clear
+env.setParallelism(1)
+
+val sqlQuery =
+  """
+|SELECT t2.a, t2.c, t1.c
+|FROM T1 as t1 LEFT OUTER JOIN T2 as t2 ON
+|  t1.a = t2.a AND
+|  t1.proctime BETWEEN t2.proctime - INTERVAL '5' SECOND AND
+|t2.proctime + INTERVAL '3' SECOND
+|""".stripMargin
+
+val data1 = new mutable.MutableList[(Int, Long, String)]
+data1.+=((1, 1L, "Hi1"))
+data1.+=((1, 2L, "Hi2"))
+data1.+=((1, 5L, "Hi3"))
+data1.+=((2, 7L, "Hi5"))
+
+val data2 = new mutable.MutableList[(Int, Long, String)]
+data2.+=((1, 1L, "HiHi"))
+data2.+=((2, 2L, "HeHe"))
+
+val t1 = env.fromCollection(data1).toTable(tEnv, 'a, 'b, 'c, 
'proctime.proctime)
+  .select('a, 'b, 'c, 'proctime)
+val t2 = env.fromCollection(data2).toTable(tEnv, 'a, 'b, 'c, 
'proctime.proctime)
+  .select('a, 'b, 'c, 'proctime)
+
+tEnv.registerTable("T1", t1)
+tEnv.registerTable("T2", t2)
+
+val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row]
+result.addSink(new StreamITCase.StringSink[Row])
+env.execute()
+  }
+
+  @Test
+  def testRowTimeLeftOuterJoin(): Unit = {
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env)
+env.setStateBackend(getStateBackend)
+env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+StreamITCase.clear
+
+val sqlQuery =
+  """
+|SELECT t2.key, t2.id, t1.id
+|FROM T1 AS t1 LEFT OUTER JOIN T2 AS t2 ON
+|  t1.key = t2.key AND
+|  t1.rt BETWEEN t2.rt - INTERVAL '5' SECOND AND
+|t2.rt + INTERVAL '6' SECOND
+|""".stripMargin
+
+val data1 = new mutable.MutableList[(String, String, Long)]
+// for boundary test
+data1.+=(("A", "L-1", 1000L))
+data1.+=(("A", "L-2", 2000L))
+data1.+=(("B", "L-4", 4000L))
+data1.+=(("A", "L-6", 6000L))
+data1.+=(("C", "L-7", 7000L))
+data1.+=(("A", "L-10", 1L))
+data1.+=(("A", "L-12", 12000L))
+data1.+=(("A", "L-20", 2L))
+
+val data2 = new mutable.MutableList[(String, String, Long)]
--- End diff --

Add a row to the right data set such that one left row joins with two right 
rows.


---


[GitHub] flink pull request #5140: [FLINK-7797] [table] Add support for windowed oute...

2017-12-28 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5140#discussion_r159013510
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/StreamWindowJoinHarnessTest.scala
 ---
@@ -20,20 +20,25 @@ package org.apache.flink.table.runtime.harness
 import java.lang.{Long => JLong}
 import java.util.concurrent.ConcurrentLinkedQueue
 
+import org.apache.flink.api.java.operators.join.JoinType
 import org.apache.flink.streaming.api.operators.co.KeyedCoProcessOperator
 import org.apache.flink.streaming.api.watermark.Watermark
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord
 import 
org.apache.flink.streaming.util.KeyedTwoInputStreamOperatorTestHarness
 import org.apache.flink.table.api.Types
 import 
org.apache.flink.table.runtime.harness.HarnessTestBase.{RowResultSortComparator,
 RowResultSortComparatorWithWatermarks, TupleRowKeySelector}
-import 
org.apache.flink.table.runtime.join.{ProcTimeBoundedStreamInnerJoin, 
RowTimeBoundedStreamInnerJoin}
+import org.apache.flink.table.runtime.join.{ProcTimeBoundedStreamJoin, 
RowTimeBoundedStreamJoin}
 import 
org.apache.flink.table.runtime.operators.KeyedCoProcessOperatorWithWatermarkDelay
 import org.apache.flink.table.runtime.types.CRow
 import org.apache.flink.types.Row
 import org.junit.Assert.assertEquals
 import org.junit.Test
 
-class JoinHarnessTest extends HarnessTestBase {
+/**
+  * Since the runtime logic for different stream window joins are 
identical, we only test on
+  * inner join.
+  */
+class StreamWindowJoinHarnessTest extends HarnessTestBase {
--- End diff --

We should also add harness tests for the outer joins. These are the only 
tests that can test certain edge cases because the order of inputs can be 
precisely controlled.


---


[GitHub] flink pull request #5140: [FLINK-7797] [table] Add support for windowed oute...

2017-12-28 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5140#discussion_r159011797
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/JoinAwareCollector.scala
 ---
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.join
+
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.types.Row
+import org.apache.flink.util.Collector
+
+/**
+  * Collector to track whether there's a joined result.
+  */
+class JoinAwareCollector extends Collector[Row]{
--- End diff --

I would make this class simpler and move some of the logic into the join 
class.
Instead of tracking if something was emitted across multiple resets, I'd 
just check if something was emitted since a single reset. The join can have a 
flag that checks the input row was emitted by joining against the state. 
Moreover, I'd make some variables public and remove the accessors to reduce the 
number of method calls.

If we do this, we don't need 

- `setCollector` (can be set by directly modifying the public var)
- `emittedThisTurn` (we only need one emission flag)
- `resetThisTurn()` (we only need one emission flag)
- `everEmitted` (emitted can be directly accessed as public var)
- `everEmittedThisTurn` (only one emission flag)
- `collectWithoutNotifying` (we can simply emit because we don't have an 
emission flag across multiple resets)


---


[GitHub] flink pull request #5140: [FLINK-7797] [table] Add support for windowed oute...

2017-12-28 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5140#discussion_r159014072
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/JoinITCase.scala
 ---
@@ -461,6 +462,302 @@ class JoinITCase extends StreamingWithStateTestBase {
 StreamITCase.compareWithList(expected)
   }
 
+  // Tests for left outer join
+  @Test
+  def testProcTimeLeftOuterJoin(): Unit = {
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env)
+env.setStateBackend(getStateBackend)
+StreamITCase.clear
+env.setParallelism(1)
+
+val sqlQuery =
+  """
+|SELECT t2.a, t2.c, t1.c
+|FROM T1 as t1 LEFT OUTER JOIN T2 as t2 ON
+|  t1.a = t2.a AND
+|  t1.proctime BETWEEN t2.proctime - INTERVAL '5' SECOND AND
+|t2.proctime + INTERVAL '3' SECOND
+|""".stripMargin
+
+val data1 = new mutable.MutableList[(Int, Long, String)]
+data1.+=((1, 1L, "Hi1"))
+data1.+=((1, 2L, "Hi2"))
+data1.+=((1, 5L, "Hi3"))
+data1.+=((2, 7L, "Hi5"))
+
+val data2 = new mutable.MutableList[(Int, Long, String)]
+data2.+=((1, 1L, "HiHi"))
+data2.+=((2, 2L, "HeHe"))
+
+val t1 = env.fromCollection(data1).toTable(tEnv, 'a, 'b, 'c, 
'proctime.proctime)
+  .select('a, 'b, 'c, 'proctime)
+val t2 = env.fromCollection(data2).toTable(tEnv, 'a, 'b, 'c, 
'proctime.proctime)
+  .select('a, 'b, 'c, 'proctime)
+
+tEnv.registerTable("T1", t1)
+tEnv.registerTable("T2", t2)
+
+val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row]
+result.addSink(new StreamITCase.StringSink[Row])
+env.execute()
+  }
+
+  @Test
+  def testRowTimeLeftOuterJoin(): Unit = {
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env)
+env.setStateBackend(getStateBackend)
+env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+StreamITCase.clear
+
+val sqlQuery =
+  """
+|SELECT t2.key, t2.id, t1.id
+|FROM T1 AS t1 LEFT OUTER JOIN T2 AS t2 ON
+|  t1.key = t2.key AND
+|  t1.rt BETWEEN t2.rt - INTERVAL '5' SECOND AND
+|t2.rt + INTERVAL '6' SECOND
+|""".stripMargin
+
+val data1 = new mutable.MutableList[(String, String, Long)]
+// for boundary test
+data1.+=(("A", "L-1", 1000L))
+data1.+=(("A", "L-2", 2000L))
+data1.+=(("B", "L-4", 4000L))
+data1.+=(("A", "L-6", 6000L))
+data1.+=(("C", "L-7", 7000L))
+data1.+=(("A", "L-10", 1L))
+data1.+=(("A", "L-12", 12000L))
+data1.+=(("A", "L-20", 2L))
+
+val data2 = new mutable.MutableList[(String, String, Long)]
+data2.+=(("A", "R-6", 6000L))
+data2.+=(("B", "R-7", 7000L))
+data2.+=(("D", "R-8", 8000L))
+
+val t1 = env.fromCollection(data1)
+  .assignTimestampsAndWatermarks(new Row3WatermarkExtractor2)
+  .toTable(tEnv, 'key, 'id, 'rt.rowtime)
+val t2 = env.fromCollection(data2)
+  .assignTimestampsAndWatermarks(new Row3WatermarkExtractor2)
+  .toTable(tEnv, 'key, 'id, 'rt.rowtime)
+
+tEnv.registerTable("T1", t1)
+tEnv.registerTable("T2", t2)
+
+val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row]
+result.addSink(new StreamITCase.StringSink[Row])
+env.execute()
+val expected = new java.util.ArrayList[String]
+expected.add("A,R-6,L-1")
+expected.add("A,R-6,L-2")
+expected.add("A,R-6,L-6")
+expected.add("A,R-6,L-10")
+expected.add("A,R-6,L-12")
+expected.add("B,R-7,L-4")
+expected.add("null,null,L-7")
+expected.add("null,null,L-20")
+StreamITCase.compareWithList(expected)
+  }
+
+  // Tests for right outer join
+  @Test
+  def testProcTimeRightOuterJoin(): Unit = {
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env)
+env.setStateBackend(getStateBackend)
+StreamITCase.clear
+env.setParallelism(1)
+
+val sqlQuery =
+  """
+|SELECT t2.a, t2.c, t1.c
+|FROM T1 as t1 RIGHT OUTER JOIN T2 as t2 ON
+|  t1.a = t2.a AND
+|  t1.proctime BETWEEN t2.proctime - INTERVAL '5' SECOND AND
+|t2.proctime + INTERVAL '3' SECOND
+|""".stripMargin
+
+val data1 = new mutable.MutableList[(Int, Long, String)]
+data1.+=((1, 1L, "Hi1"))
+data1.+=((1, 2L, "Hi2"))
+

[GitHub] flink pull request #5140: [FLINK-7797] [table] Add support for windowed oute...

2017-12-28 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5140#discussion_r159010693
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TimeBoundedStreamJoin.scala
 ---
@@ -182,16 +196,64 @@ abstract class TimeBoundedStreamInnerJoin(
 if (rightTime >= rightQualifiedLowerBound && rightTime <= 
rightQualifiedUpperBound) {
   val rightRows = rightEntry.getValue
   var i = 0
+  var markEmitted = false
--- End diff --

rename to `entryUpdated`


---


[GitHub] flink pull request #5140: [FLINK-7797] [table] Add support for windowed oute...

2017-12-28 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5140#discussion_r159009795
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TimeBoundedStreamJoin.scala
 ---
@@ -62,15 +65,23 @@ abstract class TimeBoundedStreamInnerJoin(
   with Compiler[FlatJoinFunction[Row, Row, Row]]
   with Logging {
 
-  private var cRowWrapper: CRowWrappingCollector = _
+  private val leftArity = leftType.getArity
+  private val rightArity = rightType.getArity
+  private val resultArity = leftArity + rightArity
+
+  // two reusable padding results
+  private val leftNullPaddingResult = new Row(resultArity)
--- End diff --

I think we can move the code to generate padded results into a util class 
that can be reused by other joins.


---


[GitHub] flink pull request #5140: [FLINK-7797] [table] Add support for windowed oute...

2017-12-28 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5140#discussion_r159010195
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TimeBoundedStreamJoin.scala
 ---
@@ -143,29 +172,14 @@ abstract class TimeBoundedStreamInnerJoin(
   ctx: CoProcessFunction[CRow, CRow, CRow]#Context,
   out: Collector[CRow]): Unit = {
 
+joinCollector.setCollector(out)
--- End diff --

Directly set the variable to avoid the method call (like 
`CRowWrappingCollector`).


---


[GitHub] flink pull request #5140: [FLINK-7797] [table] Add support for windowed oute...

2017-12-28 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5140#discussion_r158737260
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamWindowJoin.scala
 ---
@@ -142,50 +143,47 @@ class DataStreamWindowJoin(
 s"${joinConditionToString(schema.relDataType, joinCondition, 
getExpressionString)}), " +
 s"join: (${joinSelectionToString(schema.relDataType)})"
 
-joinType match {
-  case JoinRelType.INNER =>
-if (relativeWindowSize < 0) {
-  LOG.warn(s"The relative window size $relativeWindowSize is 
negative," +
-" please check the join conditions.")
-  createEmptyInnerJoin(leftDataStream, rightDataStream, 
returnTypeInfo)
-} else {
-  if (isRowTime) {
-createRowTimeInnerJoin(
-  leftDataStream,
-  rightDataStream,
-  returnTypeInfo,
-  joinOpName,
-  joinFunction.name,
-  joinFunction.code,
-  leftKeys,
-  rightKeys
-)
-  } else {
-createProcTimeInnerJoin(
-  leftDataStream,
-  rightDataStream,
-  returnTypeInfo,
-  joinOpName,
-  joinFunction.name,
-  joinFunction.code,
-  leftKeys,
-  rightKeys
-)
-  }
-}
-  case JoinRelType.FULL =>
-throw new TableException(
-  "Full join between stream and stream is not supported yet.")
-  case JoinRelType.LEFT =>
-throw new TableException(
-  "Left join between stream and stream is not supported yet.")
-  case JoinRelType.RIGHT =>
-throw new TableException(
-  "Right join between stream and stream is not supported yet.")
+val flinkJoinType = joinType match {
+  case JoinRelType.INNER => JoinType.INNER
+  case JoinRelType.FULL => JoinType.FULL_OUTER
+  case JoinRelType.LEFT => JoinType.LEFT_OUTER
+  case JoinRelType.RIGHT => JoinType.RIGHT_OUTER
+}
+
+if (relativeWindowSize < 0) {
+  LOG.warn(s"The relative window size $relativeWindowSize is 
negative," +
+" please check the join conditions.")
+  createEmptyJoin(leftDataStream, rightDataStream, returnTypeInfo)
--- End diff --

Empty outer joins need to be handled differently than empty inner joins 
because the records of the outer side(s) must be preserved and padded with 
nulls. Hence, we need to pass the join type and the generated code.


---


[GitHub] flink pull request #5140: [FLINK-7797] [table] Add support for windowed oute...

2017-12-08 Thread xccui
GitHub user xccui opened a pull request:

https://github.com/apache/flink/pull/5140

[FLINK-7797] [table] Add support for windowed outer joins for streaming 
tables

## What is the purpose of the change

This PR adds support for windowed outer joins for streaming tables.

## Brief change log

  - Adjusts the plan translation logic to accept stream window outer join.
  - Adheres an ever emitted flag to each row. When a row is removed from 
the cache (or detected as not cached), a null padding join result will be 
emitted if necessary.
  - Adds a custom `JoinAwareCollector` to track whether there's a 
successfully joined result for both sides in each join loop.
  - Adds table/SQL translation tests, and also join integration tests. 
Since the runtime logic is built on the existing window inner join, no new 
harness tests are added.
 - Updates the SQL/Table API docs.

## Verifying this change

This PR can be verified by the cases added in `JoinTest` and `JoinITCase`.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (**yes**)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (**yes**)
  - If yes, how is the feature documented? (**removes the restriction 
notes**)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/xccui/flink FLINK-7797

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5140.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5140


commit 34d3fde8049ec407849b61901acd8258a6a1f919
Author: Xingcan Cui 
Date:   2017-12-07T17:28:40Z

[FLINK-7797] [table] Add support for windowed outer joins for streaming 
tables




---