[GitHub] flink pull request #4625: [FLINK-6233] [table] Support time-bounded stream i...

2017-10-06 Thread xccui
Github user xccui commented on a diff in the pull request:

https://github.com/apache/flink/pull/4625#discussion_r143211000
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TimeBoundedStreamInnerJoin.scala
 ---
@@ -0,0 +1,410 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.join
+
+import java.util.{ArrayList, List => JList}
+
+import org.apache.flink.api.common.functions.FlatJoinFunction
+import org.apache.flink.api.common.state._
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.ListTypeInfo
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.co.CoProcessFunction
+import org.apache.flink.table.api.Types
+import org.apache.flink.table.codegen.Compiler
+import org.apache.flink.table.runtime.CRowWrappingCollector
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.table.util.Logging
+import org.apache.flink.types.Row
+import org.apache.flink.util.Collector
+
+/**
+  * A CoProcessFunction to execute time-bounded stream inner-join.
+  * Two kinds of time criteria:
+  * "L.time between R.time + X and R.time + Y" or "R.time between L.time - 
Y and L.time - X".
+  *
+  * @param leftLowerBound  the lower bound for the left stream (X in the 
criteria)
+  * @param leftUpperBound  the upper bound for the left stream (Y in the 
criteria)
+  * @param allowedLateness the lateness allowed for the two streams
+  * @param leftTypethe input type of left stream
+  * @param rightType   the input type of right stream
+  * @param genJoinFuncName the function code of other non-equi conditions
+  * @param genJoinFuncCode the function name of other non-equi conditions
+  *
+  */
+abstract class TimeBoundedStreamInnerJoin(
+private val leftLowerBound: Long,
+private val leftUpperBound: Long,
+private val allowedLateness: Long,
+private val leftType: TypeInformation[Row],
+private val rightType: TypeInformation[Row],
+private val genJoinFuncName: String,
+private val genJoinFuncCode: String,
+private val leftTimeIdx: Int,
+private val rightTimeIdx: Int)
+extends CoProcessFunction[CRow, CRow, CRow]
+with Compiler[FlatJoinFunction[Row, Row, Row]]
+with Logging {
+
+  private var cRowWrapper: CRowWrappingCollector = _
+
+  // the join function for other conditions
+  private var joinFunction: FlatJoinFunction[Row, Row, Row] = _
+
+  // cache to store rows from the left stream
+  private var leftCache: MapState[Long, JList[Row]] = _
+  // cache to store rows from the right stream
+  private var rightCache: MapState[Long, JList[Row]] = _
+
+  // state to record the timer on the left stream. 0 means no timer set
+  private var leftTimerState: ValueState[Long] = _
+  // state to record the timer on the right stream. 0 means no timer set
+  private var rightTimerState: ValueState[Long] = _
+
+  private val leftRelativeSize: Long = -leftLowerBound
+  private val rightRelativeSize: Long = leftUpperBound
+
+  private var leftExpirationTime: Long = 0L;
+  private var rightExpirationTime: Long = 0L;
+
+  protected var leftOperatorTime: Long = 0L
+  protected var rightOperatorTime: Long = 0L
+
+
+  // for delayed cleanup
+  private val cleanupDelay = (leftRelativeSize + rightRelativeSize) / 2
+
+  if (allowedLateness < 0) {
+throw new IllegalArgumentException("The allowed lateness must be 
non-negative.")
+  }
+
+  /**
+* Get the maximum interval between receiving a row and emitting it (as 
part of a joined result).
+* Only reasonable for row time join.
+*
+* @return the maximum delay for the outputs
+*/
+ 

[GitHub] flink pull request #4625: [FLINK-6233] [table] Support time-bounded stream i...

2017-10-06 Thread xccui
Github user xccui commented on a diff in the pull request:

https://github.com/apache/flink/pull/4625#discussion_r143208392
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TimeBoundedStreamInnerJoin.scala
 ---
@@ -0,0 +1,410 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.join
+
+import java.util.{ArrayList, List => JList}
+
+import org.apache.flink.api.common.functions.FlatJoinFunction
+import org.apache.flink.api.common.state._
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.ListTypeInfo
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.co.CoProcessFunction
+import org.apache.flink.table.api.Types
+import org.apache.flink.table.codegen.Compiler
+import org.apache.flink.table.runtime.CRowWrappingCollector
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.table.util.Logging
+import org.apache.flink.types.Row
+import org.apache.flink.util.Collector
+
+/**
+  * A CoProcessFunction to execute time-bounded stream inner-join.
+  * Two kinds of time criteria:
+  * "L.time between R.time + X and R.time + Y" or "R.time between L.time - 
Y and L.time - X".
+  *
+  * @param leftLowerBound  the lower bound for the left stream (X in the 
criteria)
+  * @param leftUpperBound  the upper bound for the left stream (Y in the 
criteria)
+  * @param allowedLateness the lateness allowed for the two streams
+  * @param leftTypethe input type of left stream
+  * @param rightType   the input type of right stream
+  * @param genJoinFuncName the function code of other non-equi conditions
+  * @param genJoinFuncCode the function name of other non-equi conditions
+  *
+  */
+abstract class TimeBoundedStreamInnerJoin(
+private val leftLowerBound: Long,
+private val leftUpperBound: Long,
+private val allowedLateness: Long,
+private val leftType: TypeInformation[Row],
+private val rightType: TypeInformation[Row],
+private val genJoinFuncName: String,
+private val genJoinFuncCode: String,
+private val leftTimeIdx: Int,
+private val rightTimeIdx: Int)
+extends CoProcessFunction[CRow, CRow, CRow]
+with Compiler[FlatJoinFunction[Row, Row, Row]]
+with Logging {
+
+  private var cRowWrapper: CRowWrappingCollector = _
+
+  // the join function for other conditions
+  private var joinFunction: FlatJoinFunction[Row, Row, Row] = _
+
+  // cache to store rows from the left stream
+  private var leftCache: MapState[Long, JList[Row]] = _
+  // cache to store rows from the right stream
+  private var rightCache: MapState[Long, JList[Row]] = _
+
+  // state to record the timer on the left stream. 0 means no timer set
+  private var leftTimerState: ValueState[Long] = _
+  // state to record the timer on the right stream. 0 means no timer set
+  private var rightTimerState: ValueState[Long] = _
+
+  private val leftRelativeSize: Long = -leftLowerBound
+  private val rightRelativeSize: Long = leftUpperBound
+
+  private var leftExpirationTime: Long = 0L;
+  private var rightExpirationTime: Long = 0L;
+
+  protected var leftOperatorTime: Long = 0L
+  protected var rightOperatorTime: Long = 0L
+
+
+  // for delayed cleanup
+  private val cleanupDelay = (leftRelativeSize + rightRelativeSize) / 2
+
+  if (allowedLateness < 0) {
+throw new IllegalArgumentException("The allowed lateness must be 
non-negative.")
+  }
+
+  /**
+* Get the maximum interval between receiving a row and emitting it (as 
part of a joined result).
+* Only reasonable for row time join.
+*
+* @return the maximum delay for the outputs
+*/
+ 

[GitHub] flink pull request #4625: [FLINK-6233] [table] Support time-bounded stream i...

2017-10-05 Thread xccui
Github user xccui commented on a diff in the pull request:

https://github.com/apache/flink/pull/4625#discussion_r142891284
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TimeBoundedStreamInnerJoin.scala
 ---
@@ -0,0 +1,410 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.join
+
+import java.util.{ArrayList, List => JList}
+
+import org.apache.flink.api.common.functions.FlatJoinFunction
+import org.apache.flink.api.common.state._
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.ListTypeInfo
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.co.CoProcessFunction
+import org.apache.flink.table.api.Types
+import org.apache.flink.table.codegen.Compiler
+import org.apache.flink.table.runtime.CRowWrappingCollector
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.table.util.Logging
+import org.apache.flink.types.Row
+import org.apache.flink.util.Collector
+
+/**
+  * A CoProcessFunction to execute time-bounded stream inner-join.
+  * Two kinds of time criteria:
+  * "L.time between R.time + X and R.time + Y" or "R.time between L.time - 
Y and L.time - X".
+  *
+  * @param leftLowerBound  the lower bound for the left stream (X in the 
criteria)
+  * @param leftUpperBound  the upper bound for the left stream (Y in the 
criteria)
+  * @param allowedLateness the lateness allowed for the two streams
+  * @param leftTypethe input type of left stream
+  * @param rightType   the input type of right stream
+  * @param genJoinFuncName the function code of other non-equi conditions
+  * @param genJoinFuncCode the function name of other non-equi conditions
+  *
+  */
+abstract class TimeBoundedStreamInnerJoin(
+private val leftLowerBound: Long,
+private val leftUpperBound: Long,
+private val allowedLateness: Long,
+private val leftType: TypeInformation[Row],
+private val rightType: TypeInformation[Row],
+private val genJoinFuncName: String,
+private val genJoinFuncCode: String,
+private val leftTimeIdx: Int,
+private val rightTimeIdx: Int)
+extends CoProcessFunction[CRow, CRow, CRow]
+with Compiler[FlatJoinFunction[Row, Row, Row]]
+with Logging {
+
+  private var cRowWrapper: CRowWrappingCollector = _
+
+  // the join function for other conditions
+  private var joinFunction: FlatJoinFunction[Row, Row, Row] = _
+
+  // cache to store rows from the left stream
+  private var leftCache: MapState[Long, JList[Row]] = _
+  // cache to store rows from the right stream
+  private var rightCache: MapState[Long, JList[Row]] = _
+
+  // state to record the timer on the left stream. 0 means no timer set
+  private var leftTimerState: ValueState[Long] = _
+  // state to record the timer on the right stream. 0 means no timer set
+  private var rightTimerState: ValueState[Long] = _
+
+  private val leftRelativeSize: Long = -leftLowerBound
+  private val rightRelativeSize: Long = leftUpperBound
+
+  private var leftExpirationTime: Long = 0L;
--- End diff --

Forgive me for the repeated silly sequelae 🤦‍♂️ 


---


[GitHub] flink pull request #4625: [FLINK-6233] [table] Support time-bounded stream i...

2017-09-29 Thread xccui
Github user xccui commented on a diff in the pull request:

https://github.com/apache/flink/pull/4625#discussion_r141993041
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TimeBoundedStreamInnerJoin.scala
 ---
@@ -131,340 +116,308 @@ class TimeBoundedStreamInnerJoin(
 // Initialize the data caches.
 val leftListTypeInfo: TypeInformation[JList[Row]] = new 
ListTypeInfo[Row](leftType)
 val leftStateDescriptor: MapStateDescriptor[Long, JList[Row]] =
-  new MapStateDescriptor[Long, JList[Row]](timeIndicator + 
"InnerJoinLeftCache",
-BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], 
leftListTypeInfo)
+  new MapStateDescriptor[Long, JList[Row]](
+timeIndicator + "InnerJoinLeftCache",
+BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]],
+leftListTypeInfo)
 leftCache = getRuntimeContext.getMapState(leftStateDescriptor)
 
 val rightListTypeInfo: TypeInformation[JList[Row]] = new 
ListTypeInfo[Row](rightType)
 val rightStateDescriptor: MapStateDescriptor[Long, JList[Row]] =
-  new MapStateDescriptor[Long, JList[Row]](timeIndicator + 
"InnerJoinRightCache",
-BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], 
rightListTypeInfo)
+  new MapStateDescriptor[Long, JList[Row]](
+timeIndicator + "InnerJoinRightCache",
+BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]],
+rightListTypeInfo)
 rightCache = getRuntimeContext.getMapState(rightStateDescriptor)
 
 // Initialize the timer states.
 val leftTimerStateDesc: ValueStateDescriptor[Long] =
-  new ValueStateDescriptor[Long](timeIndicator + 
"InnerJoinLeftTimerState",
-classOf[Long])
+  new ValueStateDescriptor[Long](timeIndicator + 
"InnerJoinLeftTimerState", classOf[Long])
 leftTimerState = getRuntimeContext.getState(leftTimerStateDesc)
 
 val rightTimerStateDesc: ValueStateDescriptor[Long] =
-  new ValueStateDescriptor[Long](timeIndicator + 
"InnerJoinRightTimerState",
-classOf[Long])
+  new ValueStateDescriptor[Long](timeIndicator + 
"InnerJoinRightTimerState", classOf[Long])
 rightTimerState = getRuntimeContext.getState(rightTimerStateDesc)
   }
 
   /**
-* Process records from the left stream.
-*
-* @param cRowValue the input record
-* @param ctx   the context to register timer or get current time
-* @param out   the collector for outputting results
-*
+* Process rows from the left stream.
 */
   override def processElement1(
-cRowValue: CRow,
-ctx: CoProcessFunction[CRow, CRow, CRow]#Context,
-out: Collector[CRow]): Unit = {
-val timeForRecord: Long = getTimeForRecord(ctx, cRowValue, true)
-getCurrentOperatorTime(ctx)
+  cRowValue: CRow,
+  ctx: CoProcessFunction[CRow, CRow, CRow]#Context,
+  out: Collector[CRow]): Unit = {
+updateOperatorTime(ctx)
+val rowTime: Long = getTimeForLeftStream(ctx, cRowValue)
+val oppositeLowerBound: Long = rowTime - rightRelativeSize
+val oppositeUpperBound: Long = rowTime + leftRelativeSize
 processElement(
   cRowValue,
-  timeForRecord,
+  rowTime,
   ctx,
   out,
   leftOperatorTime,
+  oppositeLowerBound,
+  oppositeUpperBound,
   rightOperatorTime,
   rightTimerState,
   leftCache,
   rightCache,
-  true
+  leftRow = true
 )
   }
 
   /**
-* Process records from the right stream.
-*
-* @param cRowValue the input record
-* @param ctx   the context to get current time
-* @param out   the collector for outputting results
-*
+* Process rows from the right stream.
 */
   override def processElement2(
-cRowValue: CRow,
-ctx: CoProcessFunction[CRow, CRow, CRow]#Context,
-out: Collector[CRow]): Unit = {
-val timeForRecord: Long = getTimeForRecord(ctx, cRowValue, false)
-getCurrentOperatorTime(ctx)
+  cRowValue: CRow,
+  ctx: CoProcessFunction[CRow, CRow, CRow]#Context,
+  out: Collector[CRow]): Unit = {
+updateOperatorTime(ctx)
+val rowTime: Long = getTimeForRightStream(ctx, cRowValue)
+val oppositeLowerBound: Long = rowTime - leftRelativeSize
+val oppositeUpperBound: Long =  rowTime + rightRelativeSize
 processElement(
   cRowValue,
-  timeForRecord,
+  rowTime,
   ctx,
   out,
   rightOperatorTime,
+

[GitHub] flink issue #4625: [FLINK-6233] [table] Support time-bounded stream inner jo...

2017-09-24 Thread xccui
Github user xccui commented on the issue:

https://github.com/apache/flink/pull/4625
  
Hi @fhueske, the PR has been updated. Temporarily, I keep the logic for 
dealing with the late data, as well as the fine-grained cache. 

For the late data semantics problem, I think we need to rethink it and make 
a final decision (maybe we should consult others). For the cache optimization 
problem, I want to leave it a future work. 


---


[GitHub] flink pull request #4625: [FLINK-6233] [table] Support time-bounded stream i...

2017-09-23 Thread xccui
Github user xccui commented on a diff in the pull request:

https://github.com/apache/flink/pull/4625#discussion_r140645274
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TimeBoundedStreamInnerJoin.scala
 ---
@@ -131,340 +116,308 @@ class TimeBoundedStreamInnerJoin(
 // Initialize the data caches.
 val leftListTypeInfo: TypeInformation[JList[Row]] = new 
ListTypeInfo[Row](leftType)
 val leftStateDescriptor: MapStateDescriptor[Long, JList[Row]] =
-  new MapStateDescriptor[Long, JList[Row]](timeIndicator + 
"InnerJoinLeftCache",
-BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], 
leftListTypeInfo)
+  new MapStateDescriptor[Long, JList[Row]](
+timeIndicator + "InnerJoinLeftCache",
+BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]],
+leftListTypeInfo)
 leftCache = getRuntimeContext.getMapState(leftStateDescriptor)
 
 val rightListTypeInfo: TypeInformation[JList[Row]] = new 
ListTypeInfo[Row](rightType)
 val rightStateDescriptor: MapStateDescriptor[Long, JList[Row]] =
-  new MapStateDescriptor[Long, JList[Row]](timeIndicator + 
"InnerJoinRightCache",
-BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], 
rightListTypeInfo)
+  new MapStateDescriptor[Long, JList[Row]](
+timeIndicator + "InnerJoinRightCache",
+BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]],
+rightListTypeInfo)
 rightCache = getRuntimeContext.getMapState(rightStateDescriptor)
 
 // Initialize the timer states.
 val leftTimerStateDesc: ValueStateDescriptor[Long] =
-  new ValueStateDescriptor[Long](timeIndicator + 
"InnerJoinLeftTimerState",
-classOf[Long])
+  new ValueStateDescriptor[Long](timeIndicator + 
"InnerJoinLeftTimerState", classOf[Long])
 leftTimerState = getRuntimeContext.getState(leftTimerStateDesc)
 
 val rightTimerStateDesc: ValueStateDescriptor[Long] =
-  new ValueStateDescriptor[Long](timeIndicator + 
"InnerJoinRightTimerState",
-classOf[Long])
+  new ValueStateDescriptor[Long](timeIndicator + 
"InnerJoinRightTimerState", classOf[Long])
 rightTimerState = getRuntimeContext.getState(rightTimerStateDesc)
   }
 
   /**
-* Process records from the left stream.
-*
-* @param cRowValue the input record
-* @param ctx   the context to register timer or get current time
-* @param out   the collector for outputting results
-*
+* Process rows from the left stream.
 */
   override def processElement1(
-cRowValue: CRow,
-ctx: CoProcessFunction[CRow, CRow, CRow]#Context,
-out: Collector[CRow]): Unit = {
-val timeForRecord: Long = getTimeForRecord(ctx, cRowValue, true)
-getCurrentOperatorTime(ctx)
+  cRowValue: CRow,
+  ctx: CoProcessFunction[CRow, CRow, CRow]#Context,
+  out: Collector[CRow]): Unit = {
+updateOperatorTime(ctx)
+val rowTime: Long = getTimeForLeftStream(ctx, cRowValue)
+val oppositeLowerBound: Long = rowTime - rightRelativeSize
+val oppositeUpperBound: Long = rowTime + leftRelativeSize
 processElement(
   cRowValue,
-  timeForRecord,
+  rowTime,
   ctx,
   out,
   leftOperatorTime,
+  oppositeLowerBound,
+  oppositeUpperBound,
   rightOperatorTime,
   rightTimerState,
   leftCache,
   rightCache,
-  true
+  leftRow = true
 )
   }
 
   /**
-* Process records from the right stream.
-*
-* @param cRowValue the input record
-* @param ctx   the context to get current time
-* @param out   the collector for outputting results
-*
+* Process rows from the right stream.
 */
   override def processElement2(
-cRowValue: CRow,
-ctx: CoProcessFunction[CRow, CRow, CRow]#Context,
-out: Collector[CRow]): Unit = {
-val timeForRecord: Long = getTimeForRecord(ctx, cRowValue, false)
-getCurrentOperatorTime(ctx)
+  cRowValue: CRow,
+  ctx: CoProcessFunction[CRow, CRow, CRow]#Context,
+  out: Collector[CRow]): Unit = {
+updateOperatorTime(ctx)
+val rowTime: Long = getTimeForRightStream(ctx, cRowValue)
+val oppositeLowerBound: Long = rowTime - leftRelativeSize
+val oppositeUpperBound: Long =  rowTime + rightRelativeSize
 processElement(
   cRowValue,
-  timeForRecord,
+  rowTime,
   ctx,
   out,
   rightOperatorTime,
+

[GitHub] flink pull request #4625: [FLINK-6233] [table] Support time-bounded stream i...

2017-09-21 Thread xccui
Github user xccui commented on a diff in the pull request:

https://github.com/apache/flink/pull/4625#discussion_r140266297
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TimeBoundedStreamInnerJoin.scala
 ---
@@ -131,340 +116,308 @@ class TimeBoundedStreamInnerJoin(
 // Initialize the data caches.
 val leftListTypeInfo: TypeInformation[JList[Row]] = new 
ListTypeInfo[Row](leftType)
 val leftStateDescriptor: MapStateDescriptor[Long, JList[Row]] =
-  new MapStateDescriptor[Long, JList[Row]](timeIndicator + 
"InnerJoinLeftCache",
-BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], 
leftListTypeInfo)
+  new MapStateDescriptor[Long, JList[Row]](
+timeIndicator + "InnerJoinLeftCache",
+BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]],
+leftListTypeInfo)
 leftCache = getRuntimeContext.getMapState(leftStateDescriptor)
 
 val rightListTypeInfo: TypeInformation[JList[Row]] = new 
ListTypeInfo[Row](rightType)
 val rightStateDescriptor: MapStateDescriptor[Long, JList[Row]] =
-  new MapStateDescriptor[Long, JList[Row]](timeIndicator + 
"InnerJoinRightCache",
-BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], 
rightListTypeInfo)
+  new MapStateDescriptor[Long, JList[Row]](
+timeIndicator + "InnerJoinRightCache",
+BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]],
+rightListTypeInfo)
 rightCache = getRuntimeContext.getMapState(rightStateDescriptor)
 
 // Initialize the timer states.
 val leftTimerStateDesc: ValueStateDescriptor[Long] =
-  new ValueStateDescriptor[Long](timeIndicator + 
"InnerJoinLeftTimerState",
-classOf[Long])
+  new ValueStateDescriptor[Long](timeIndicator + 
"InnerJoinLeftTimerState", classOf[Long])
 leftTimerState = getRuntimeContext.getState(leftTimerStateDesc)
 
 val rightTimerStateDesc: ValueStateDescriptor[Long] =
-  new ValueStateDescriptor[Long](timeIndicator + 
"InnerJoinRightTimerState",
-classOf[Long])
+  new ValueStateDescriptor[Long](timeIndicator + 
"InnerJoinRightTimerState", classOf[Long])
 rightTimerState = getRuntimeContext.getState(rightTimerStateDesc)
   }
 
   /**
-* Process records from the left stream.
-*
-* @param cRowValue the input record
-* @param ctx   the context to register timer or get current time
-* @param out   the collector for outputting results
-*
+* Process rows from the left stream.
 */
   override def processElement1(
-cRowValue: CRow,
-ctx: CoProcessFunction[CRow, CRow, CRow]#Context,
-out: Collector[CRow]): Unit = {
-val timeForRecord: Long = getTimeForRecord(ctx, cRowValue, true)
-getCurrentOperatorTime(ctx)
+  cRowValue: CRow,
+  ctx: CoProcessFunction[CRow, CRow, CRow]#Context,
+  out: Collector[CRow]): Unit = {
+updateOperatorTime(ctx)
+val rowTime: Long = getTimeForLeftStream(ctx, cRowValue)
+val oppositeLowerBound: Long = rowTime - rightRelativeSize
+val oppositeUpperBound: Long = rowTime + leftRelativeSize
 processElement(
   cRowValue,
-  timeForRecord,
+  rowTime,
   ctx,
   out,
   leftOperatorTime,
+  oppositeLowerBound,
+  oppositeUpperBound,
   rightOperatorTime,
   rightTimerState,
   leftCache,
   rightCache,
-  true
+  leftRow = true
 )
   }
 
   /**
-* Process records from the right stream.
-*
-* @param cRowValue the input record
-* @param ctx   the context to get current time
-* @param out   the collector for outputting results
-*
+* Process rows from the right stream.
 */
   override def processElement2(
-cRowValue: CRow,
-ctx: CoProcessFunction[CRow, CRow, CRow]#Context,
-out: Collector[CRow]): Unit = {
-val timeForRecord: Long = getTimeForRecord(ctx, cRowValue, false)
-getCurrentOperatorTime(ctx)
+  cRowValue: CRow,
+  ctx: CoProcessFunction[CRow, CRow, CRow]#Context,
+  out: Collector[CRow]): Unit = {
+updateOperatorTime(ctx)
+val rowTime: Long = getTimeForRightStream(ctx, cRowValue)
+val oppositeLowerBound: Long = rowTime - leftRelativeSize
+val oppositeUpperBound: Long =  rowTime + rightRelativeSize
 processElement(
   cRowValue,
-  timeForRecord,
+  rowTime,
   ctx,
   out,
   rightOperatorTime,
+

[GitHub] flink pull request #4625: [FLINK-6233] [table] Support time-bounded stream i...

2017-09-21 Thread xccui
Github user xccui commented on a diff in the pull request:

https://github.com/apache/flink/pull/4625#discussion_r140255052
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TimeBoundedStreamInnerJoin.scala
 ---
@@ -0,0 +1,442 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.join
+
+import java.util.{ArrayList, List => JList}
+
+import org.apache.flink.api.common.functions.FlatJoinFunction
+import org.apache.flink.api.common.state._
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, 
TypeInformation}
+import org.apache.flink.api.java.typeutils.ListTypeInfo
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.co.CoProcessFunction
+import org.apache.flink.table.codegen.Compiler
+import org.apache.flink.table.runtime.CRowWrappingCollector
+import 
org.apache.flink.table.runtime.join.JoinTimeIndicator.JoinTimeIndicator
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.table.util.Logging
+import org.apache.flink.types.Row
+import org.apache.flink.util.Collector
+
+/**
+  * A CoProcessFunction to execute time-bounded stream inner-join.
+  * Two kinds of time criteria:
+  * "L.time between R.time + X and R.time + Y" or "R.time between L.time - 
Y and L.time - X".
+  *
+  * @param leftLowerBound  the lower bound for the left stream (X in the 
criteria)
+  * @param leftUpperBound  the upper bound for the left stream (Y in the 
criteria)
+  * @param allowedLateness the lateness allowed for the two streams
+  * @param leftTypethe input type of left stream
+  * @param rightType   the input type of right stream
+  * @param genJoinFuncName the function code of other non-equi conditions
+  * @param genJoinFuncCode the function name of other non-equi conditions
+  * @param timeIndicator   indicate whether joining on proctime or rowtime
+  *
+  */
+abstract class TimeBoundedStreamInnerJoin(
+private val leftLowerBound: Long,
+private val leftUpperBound: Long,
+private val allowedLateness: Long,
+private val leftType: TypeInformation[Row],
+private val rightType: TypeInformation[Row],
+private val genJoinFuncName: String,
+private val genJoinFuncCode: String,
+private val leftTimeIdx: Int,
+private val rightTimeIdx: Int,
+private val timeIndicator: JoinTimeIndicator)
+extends CoProcessFunction[CRow, CRow, CRow]
+with Compiler[FlatJoinFunction[Row, Row, Row]]
+with Logging {
+
+  private var cRowWrapper: CRowWrappingCollector = _
+
+  // the join function for other conditions
+  private var joinFunction: FlatJoinFunction[Row, Row, Row] = _
+
+  // cache to store rows from the left stream
+  private var leftCache: MapState[Long, JList[Row]] = _
+  // cache to store rows from the right stream
+  private var rightCache: MapState[Long, JList[Row]] = _
+
+  // state to record the timer on the left stream. 0 means no timer set
+  private var leftTimerState: ValueState[Long] = _
+  // state to record the timer on the right stream. 0 means no timer set
+  private var rightTimerState: ValueState[Long] = _
+
+  private val leftRelativeSize: Long = -leftLowerBound
+  private val rightRelativeSize: Long = leftUpperBound
+
+  protected var leftOperatorTime: Long = 0L
+  protected var rightOperatorTime: Long = 0L
+
+  //For delayed cleanup
+  private val cleanupDelay = (leftRelativeSize + rightRelativeSize) / 2
+
+  if (allowedLateness < 0) {
+throw new IllegalArgumentException("The allowed lateness must be 
non-negative.")
+  }
+
+  /**
+* Get the maximum interval between receiving a row and emitting it (as 
part of a joined result).
+* Only reasonable for row time join.
+  

[GitHub] flink pull request #4625: [FLINK-6233] [table] Support time-bounded stream i...

2017-09-21 Thread xccui
Github user xccui commented on a diff in the pull request:

https://github.com/apache/flink/pull/4625#discussion_r140251765
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TimeBoundedStreamInnerJoin.scala
 ---
@@ -131,340 +116,308 @@ class TimeBoundedStreamInnerJoin(
 // Initialize the data caches.
 val leftListTypeInfo: TypeInformation[JList[Row]] = new 
ListTypeInfo[Row](leftType)
 val leftStateDescriptor: MapStateDescriptor[Long, JList[Row]] =
-  new MapStateDescriptor[Long, JList[Row]](timeIndicator + 
"InnerJoinLeftCache",
-BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], 
leftListTypeInfo)
+  new MapStateDescriptor[Long, JList[Row]](
+timeIndicator + "InnerJoinLeftCache",
+BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]],
+leftListTypeInfo)
 leftCache = getRuntimeContext.getMapState(leftStateDescriptor)
 
 val rightListTypeInfo: TypeInformation[JList[Row]] = new 
ListTypeInfo[Row](rightType)
 val rightStateDescriptor: MapStateDescriptor[Long, JList[Row]] =
-  new MapStateDescriptor[Long, JList[Row]](timeIndicator + 
"InnerJoinRightCache",
-BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], 
rightListTypeInfo)
+  new MapStateDescriptor[Long, JList[Row]](
+timeIndicator + "InnerJoinRightCache",
+BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]],
+rightListTypeInfo)
 rightCache = getRuntimeContext.getMapState(rightStateDescriptor)
 
 // Initialize the timer states.
 val leftTimerStateDesc: ValueStateDescriptor[Long] =
-  new ValueStateDescriptor[Long](timeIndicator + 
"InnerJoinLeftTimerState",
-classOf[Long])
+  new ValueStateDescriptor[Long](timeIndicator + 
"InnerJoinLeftTimerState", classOf[Long])
 leftTimerState = getRuntimeContext.getState(leftTimerStateDesc)
 
 val rightTimerStateDesc: ValueStateDescriptor[Long] =
-  new ValueStateDescriptor[Long](timeIndicator + 
"InnerJoinRightTimerState",
-classOf[Long])
+  new ValueStateDescriptor[Long](timeIndicator + 
"InnerJoinRightTimerState", classOf[Long])
 rightTimerState = getRuntimeContext.getState(rightTimerStateDesc)
   }
 
   /**
-* Process records from the left stream.
-*
-* @param cRowValue the input record
-* @param ctx   the context to register timer or get current time
-* @param out   the collector for outputting results
-*
+* Process rows from the left stream.
 */
   override def processElement1(
-cRowValue: CRow,
-ctx: CoProcessFunction[CRow, CRow, CRow]#Context,
-out: Collector[CRow]): Unit = {
-val timeForRecord: Long = getTimeForRecord(ctx, cRowValue, true)
-getCurrentOperatorTime(ctx)
+  cRowValue: CRow,
+  ctx: CoProcessFunction[CRow, CRow, CRow]#Context,
+  out: Collector[CRow]): Unit = {
+updateOperatorTime(ctx)
+val rowTime: Long = getTimeForLeftStream(ctx, cRowValue)
+val oppositeLowerBound: Long = rowTime - rightRelativeSize
+val oppositeUpperBound: Long = rowTime + leftRelativeSize
 processElement(
   cRowValue,
-  timeForRecord,
+  rowTime,
   ctx,
   out,
   leftOperatorTime,
+  oppositeLowerBound,
+  oppositeUpperBound,
   rightOperatorTime,
   rightTimerState,
   leftCache,
   rightCache,
-  true
+  leftRow = true
 )
   }
 
   /**
-* Process records from the right stream.
-*
-* @param cRowValue the input record
-* @param ctx   the context to get current time
-* @param out   the collector for outputting results
-*
+* Process rows from the right stream.
 */
   override def processElement2(
-cRowValue: CRow,
-ctx: CoProcessFunction[CRow, CRow, CRow]#Context,
-out: Collector[CRow]): Unit = {
-val timeForRecord: Long = getTimeForRecord(ctx, cRowValue, false)
-getCurrentOperatorTime(ctx)
+  cRowValue: CRow,
+  ctx: CoProcessFunction[CRow, CRow, CRow]#Context,
+  out: Collector[CRow]): Unit = {
+updateOperatorTime(ctx)
+val rowTime: Long = getTimeForRightStream(ctx, cRowValue)
+val oppositeLowerBound: Long = rowTime - leftRelativeSize
+val oppositeUpperBound: Long =  rowTime + rightRelativeSize
 processElement(
   cRowValue,
-  timeForRecord,
+  rowTime,
   ctx,
   out,
   rightOperatorTime,
+

[GitHub] flink pull request #4625: [FLINK-6233] [table] Support time-bounded stream i...

2017-09-20 Thread xccui
Github user xccui commented on a diff in the pull request:

https://github.com/apache/flink/pull/4625#discussion_r139986193
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/JoinHarnessTest.scala
 ---
@@ -383,13 +384,158 @@ class JoinHarnessTest extends HarnessTestBase{
 val expectedOutput = new ConcurrentLinkedQueue[Object]()
 
 expectedOutput.add(new StreamRecord(
-  CRow(Row.of(2: JInt, "aaa2", 2: JInt, "bbb7"), true), 7))
+  CRow(Row.of(2L: JLong, "aaa2", 2L: JLong, "bbb7"), true), 7))
 expectedOutput.add(new StreamRecord(
-  CRow(Row.of(1: JInt, "aaa3", 1: JInt, "bbb12"), true), 12))
+  CRow(Row.of(1L: JLong, "aaa3", 1L: JLong, "bbb12"), true), 12))
 
 verify(expectedOutput, result, new RowResultSortComparator())
 
 testHarness.close()
   }
 
+  /** a.c1 >= b.rowtime - 10 and a.rowtime <= b.rowtime + 20 **/
+  @Test
+  def testCommonRowTimeJoin() {
--- End diff --

Oh, sorry I miss this part. Will add soon.


---


[GitHub] flink pull request #4625: [FLINK-6233] [table] Support time-bounded stream i...

2017-09-20 Thread xccui
Github user xccui commented on a diff in the pull request:

https://github.com/apache/flink/pull/4625#discussion_r139983961
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TimeBoundedStreamInnerJoin.scala
 ---
@@ -131,340 +116,308 @@ class TimeBoundedStreamInnerJoin(
 // Initialize the data caches.
 val leftListTypeInfo: TypeInformation[JList[Row]] = new 
ListTypeInfo[Row](leftType)
 val leftStateDescriptor: MapStateDescriptor[Long, JList[Row]] =
-  new MapStateDescriptor[Long, JList[Row]](timeIndicator + 
"InnerJoinLeftCache",
-BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], 
leftListTypeInfo)
+  new MapStateDescriptor[Long, JList[Row]](
+timeIndicator + "InnerJoinLeftCache",
+BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]],
+leftListTypeInfo)
 leftCache = getRuntimeContext.getMapState(leftStateDescriptor)
 
 val rightListTypeInfo: TypeInformation[JList[Row]] = new 
ListTypeInfo[Row](rightType)
 val rightStateDescriptor: MapStateDescriptor[Long, JList[Row]] =
-  new MapStateDescriptor[Long, JList[Row]](timeIndicator + 
"InnerJoinRightCache",
-BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], 
rightListTypeInfo)
+  new MapStateDescriptor[Long, JList[Row]](
+timeIndicator + "InnerJoinRightCache",
+BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]],
+rightListTypeInfo)
 rightCache = getRuntimeContext.getMapState(rightStateDescriptor)
 
 // Initialize the timer states.
 val leftTimerStateDesc: ValueStateDescriptor[Long] =
-  new ValueStateDescriptor[Long](timeIndicator + 
"InnerJoinLeftTimerState",
-classOf[Long])
+  new ValueStateDescriptor[Long](timeIndicator + 
"InnerJoinLeftTimerState", classOf[Long])
 leftTimerState = getRuntimeContext.getState(leftTimerStateDesc)
 
 val rightTimerStateDesc: ValueStateDescriptor[Long] =
-  new ValueStateDescriptor[Long](timeIndicator + 
"InnerJoinRightTimerState",
-classOf[Long])
+  new ValueStateDescriptor[Long](timeIndicator + 
"InnerJoinRightTimerState", classOf[Long])
 rightTimerState = getRuntimeContext.getState(rightTimerStateDesc)
   }
 
   /**
-* Process records from the left stream.
-*
-* @param cRowValue the input record
-* @param ctx   the context to register timer or get current time
-* @param out   the collector for outputting results
-*
+* Process rows from the left stream.
 */
   override def processElement1(
-cRowValue: CRow,
-ctx: CoProcessFunction[CRow, CRow, CRow]#Context,
-out: Collector[CRow]): Unit = {
-val timeForRecord: Long = getTimeForRecord(ctx, cRowValue, true)
-getCurrentOperatorTime(ctx)
+  cRowValue: CRow,
+  ctx: CoProcessFunction[CRow, CRow, CRow]#Context,
+  out: Collector[CRow]): Unit = {
+updateOperatorTime(ctx)
+val rowTime: Long = getTimeForLeftStream(ctx, cRowValue)
+val oppositeLowerBound: Long = rowTime - rightRelativeSize
+val oppositeUpperBound: Long = rowTime + leftRelativeSize
 processElement(
   cRowValue,
-  timeForRecord,
+  rowTime,
   ctx,
   out,
   leftOperatorTime,
+  oppositeLowerBound,
+  oppositeUpperBound,
   rightOperatorTime,
   rightTimerState,
   leftCache,
   rightCache,
-  true
+  leftRow = true
 )
   }
 
   /**
-* Process records from the right stream.
-*
-* @param cRowValue the input record
-* @param ctx   the context to get current time
-* @param out   the collector for outputting results
-*
+* Process rows from the right stream.
 */
   override def processElement2(
-cRowValue: CRow,
-ctx: CoProcessFunction[CRow, CRow, CRow]#Context,
-out: Collector[CRow]): Unit = {
-val timeForRecord: Long = getTimeForRecord(ctx, cRowValue, false)
-getCurrentOperatorTime(ctx)
+  cRowValue: CRow,
+  ctx: CoProcessFunction[CRow, CRow, CRow]#Context,
+  out: Collector[CRow]): Unit = {
+updateOperatorTime(ctx)
+val rowTime: Long = getTimeForRightStream(ctx, cRowValue)
+val oppositeLowerBound: Long = rowTime - leftRelativeSize
+val oppositeUpperBound: Long =  rowTime + rightRelativeSize
 processElement(
   cRowValue,
-  timeForRecord,
+  rowTime,
   ctx,
   out,
   rightOperatorTime,
+

[GitHub] flink pull request #4625: [FLINK-6233] [table] Support time-bounded stream i...

2017-09-19 Thread xccui
Github user xccui commented on a diff in the pull request:

https://github.com/apache/flink/pull/4625#discussion_r139849018
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/RowTimeBoundedStreamInnerJoin.scala
 ---
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.join
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.streaming.api.functions.co.CoProcessFunction
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.types.Row
+
+/**
+  * The function to execute row(event) time bounded stream inner-join.
+  */
+class RowTimeBoundedStreamInnerJoin(
+leftLowerBound: Long,
+leftUpperBound: Long,
+allowedLateness: Long,
+leftType: TypeInformation[Row],
+rightType: TypeInformation[Row],
+genJoinFuncName: String,
+genJoinFuncCode: String,
+leftTimeIdx: Int,
+rightTimeIdx: Int)
+extends TimeBoundedStreamInnerJoin(
+  leftLowerBound,
+  leftUpperBound,
+  allowedLateness,
+  leftType,
+  rightType,
+  genJoinFuncName,
+  genJoinFuncCode,
+  leftTimeIdx,
+  rightTimeIdx,
+  JoinTimeIndicator.ROWTIME) {
+
+  override def checkRowOutOfDate(timeForRow: Long, watermark: Long) = {
+timeForRow <= watermark - allowedLateness
+  }
+
+  override def updateOperatorTime(ctx: CoProcessFunction[CRow, CRow, 
CRow]#Context): Unit = {
+rightOperatorTime =
+  if (ctx.timerService().currentWatermark() > 0) 
ctx.timerService().currentWatermark()
--- End diff --

Totally understand 😄 


---


[GitHub] flink pull request #4625: [FLINK-6233] [table] Support time-bounded stream i...

2017-09-19 Thread xccui
Github user xccui commented on a diff in the pull request:

https://github.com/apache/flink/pull/4625#discussion_r139633281
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TimeBoundedStreamInnerJoin.scala
 ---
@@ -131,340 +116,308 @@ class TimeBoundedStreamInnerJoin(
 // Initialize the data caches.
 val leftListTypeInfo: TypeInformation[JList[Row]] = new 
ListTypeInfo[Row](leftType)
 val leftStateDescriptor: MapStateDescriptor[Long, JList[Row]] =
-  new MapStateDescriptor[Long, JList[Row]](timeIndicator + 
"InnerJoinLeftCache",
-BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], 
leftListTypeInfo)
+  new MapStateDescriptor[Long, JList[Row]](
+timeIndicator + "InnerJoinLeftCache",
+BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]],
+leftListTypeInfo)
 leftCache = getRuntimeContext.getMapState(leftStateDescriptor)
 
 val rightListTypeInfo: TypeInformation[JList[Row]] = new 
ListTypeInfo[Row](rightType)
 val rightStateDescriptor: MapStateDescriptor[Long, JList[Row]] =
-  new MapStateDescriptor[Long, JList[Row]](timeIndicator + 
"InnerJoinRightCache",
-BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], 
rightListTypeInfo)
+  new MapStateDescriptor[Long, JList[Row]](
+timeIndicator + "InnerJoinRightCache",
+BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]],
+rightListTypeInfo)
 rightCache = getRuntimeContext.getMapState(rightStateDescriptor)
 
 // Initialize the timer states.
 val leftTimerStateDesc: ValueStateDescriptor[Long] =
-  new ValueStateDescriptor[Long](timeIndicator + 
"InnerJoinLeftTimerState",
-classOf[Long])
+  new ValueStateDescriptor[Long](timeIndicator + 
"InnerJoinLeftTimerState", classOf[Long])
 leftTimerState = getRuntimeContext.getState(leftTimerStateDesc)
 
 val rightTimerStateDesc: ValueStateDescriptor[Long] =
-  new ValueStateDescriptor[Long](timeIndicator + 
"InnerJoinRightTimerState",
-classOf[Long])
+  new ValueStateDescriptor[Long](timeIndicator + 
"InnerJoinRightTimerState", classOf[Long])
 rightTimerState = getRuntimeContext.getState(rightTimerStateDesc)
   }
 
   /**
-* Process records from the left stream.
-*
-* @param cRowValue the input record
-* @param ctx   the context to register timer or get current time
-* @param out   the collector for outputting results
-*
+* Process rows from the left stream.
 */
   override def processElement1(
-cRowValue: CRow,
-ctx: CoProcessFunction[CRow, CRow, CRow]#Context,
-out: Collector[CRow]): Unit = {
-val timeForRecord: Long = getTimeForRecord(ctx, cRowValue, true)
-getCurrentOperatorTime(ctx)
+  cRowValue: CRow,
+  ctx: CoProcessFunction[CRow, CRow, CRow]#Context,
+  out: Collector[CRow]): Unit = {
+updateOperatorTime(ctx)
+val rowTime: Long = getTimeForLeftStream(ctx, cRowValue)
+val oppositeLowerBound: Long = rowTime - rightRelativeSize
+val oppositeUpperBound: Long = rowTime + leftRelativeSize
 processElement(
   cRowValue,
-  timeForRecord,
+  rowTime,
   ctx,
   out,
   leftOperatorTime,
+  oppositeLowerBound,
+  oppositeUpperBound,
   rightOperatorTime,
   rightTimerState,
   leftCache,
   rightCache,
-  true
+  leftRow = true
 )
   }
 
   /**
-* Process records from the right stream.
-*
-* @param cRowValue the input record
-* @param ctx   the context to get current time
-* @param out   the collector for outputting results
-*
+* Process rows from the right stream.
 */
   override def processElement2(
-cRowValue: CRow,
-ctx: CoProcessFunction[CRow, CRow, CRow]#Context,
-out: Collector[CRow]): Unit = {
-val timeForRecord: Long = getTimeForRecord(ctx, cRowValue, false)
-getCurrentOperatorTime(ctx)
+  cRowValue: CRow,
+  ctx: CoProcessFunction[CRow, CRow, CRow]#Context,
+  out: Collector[CRow]): Unit = {
+updateOperatorTime(ctx)
+val rowTime: Long = getTimeForRightStream(ctx, cRowValue)
+val oppositeLowerBound: Long = rowTime - leftRelativeSize
+val oppositeUpperBound: Long =  rowTime + rightRelativeSize
 processElement(
   cRowValue,
-  timeForRecord,
+  rowTime,
   ctx,
   out,
   rightOperatorTime,
+

[GitHub] flink pull request #4625: [FLINK-6233] [table] Support time-bounded stream i...

2017-09-19 Thread xccui
Github user xccui commented on a diff in the pull request:

https://github.com/apache/flink/pull/4625#discussion_r139631978
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/RowTimeBoundedStreamInnerJoin.scala
 ---
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.join
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.streaming.api.functions.co.CoProcessFunction
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.types.Row
+
+/**
+  * The function to execute row(event) time bounded stream inner-join.
+  */
+class RowTimeBoundedStreamInnerJoin(
+leftLowerBound: Long,
+leftUpperBound: Long,
+allowedLateness: Long,
+leftType: TypeInformation[Row],
+rightType: TypeInformation[Row],
+genJoinFuncName: String,
+genJoinFuncCode: String,
+leftTimeIdx: Int,
+rightTimeIdx: Int)
+extends TimeBoundedStreamInnerJoin(
+  leftLowerBound,
+  leftUpperBound,
+  allowedLateness,
+  leftType,
+  rightType,
+  genJoinFuncName,
+  genJoinFuncCode,
+  leftTimeIdx,
+  rightTimeIdx,
+  JoinTimeIndicator.ROWTIME) {
+
+  override def checkRowOutOfDate(timeForRow: Long, watermark: Long) = {
+timeForRow <= watermark - allowedLateness
+  }
+
+  override def updateOperatorTime(ctx: CoProcessFunction[CRow, CRow, 
CRow]#Context): Unit = {
+rightOperatorTime =
+  if (ctx.timerService().currentWatermark() > 0) 
ctx.timerService().currentWatermark()
--- End diff --

After checking the codes, I find out that the number of watermark usages is 
even larger than that of their updates (i.e., each update is followed by at 
least one usage). Considering that the number of usage may increase in the 
future, I suggest to keep this check in the update method. Of course, it would 
be better if we can use a "safer" initial value for
 the watermark😄. What do you think? 


---


[GitHub] flink pull request #4625: [FLINK-6233] [table] Support time-bounded stream i...

2017-09-19 Thread xccui
Github user xccui commented on a diff in the pull request:

https://github.com/apache/flink/pull/4625#discussion_r139628583
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TimeBoundedStreamInnerJoin.scala
 ---
@@ -131,340 +116,308 @@ class TimeBoundedStreamInnerJoin(
 // Initialize the data caches.
 val leftListTypeInfo: TypeInformation[JList[Row]] = new 
ListTypeInfo[Row](leftType)
 val leftStateDescriptor: MapStateDescriptor[Long, JList[Row]] =
-  new MapStateDescriptor[Long, JList[Row]](timeIndicator + 
"InnerJoinLeftCache",
-BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], 
leftListTypeInfo)
+  new MapStateDescriptor[Long, JList[Row]](
+timeIndicator + "InnerJoinLeftCache",
+BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]],
+leftListTypeInfo)
 leftCache = getRuntimeContext.getMapState(leftStateDescriptor)
 
 val rightListTypeInfo: TypeInformation[JList[Row]] = new 
ListTypeInfo[Row](rightType)
 val rightStateDescriptor: MapStateDescriptor[Long, JList[Row]] =
-  new MapStateDescriptor[Long, JList[Row]](timeIndicator + 
"InnerJoinRightCache",
-BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], 
rightListTypeInfo)
+  new MapStateDescriptor[Long, JList[Row]](
+timeIndicator + "InnerJoinRightCache",
+BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]],
+rightListTypeInfo)
 rightCache = getRuntimeContext.getMapState(rightStateDescriptor)
 
 // Initialize the timer states.
 val leftTimerStateDesc: ValueStateDescriptor[Long] =
-  new ValueStateDescriptor[Long](timeIndicator + 
"InnerJoinLeftTimerState",
-classOf[Long])
+  new ValueStateDescriptor[Long](timeIndicator + 
"InnerJoinLeftTimerState", classOf[Long])
 leftTimerState = getRuntimeContext.getState(leftTimerStateDesc)
 
 val rightTimerStateDesc: ValueStateDescriptor[Long] =
-  new ValueStateDescriptor[Long](timeIndicator + 
"InnerJoinRightTimerState",
-classOf[Long])
+  new ValueStateDescriptor[Long](timeIndicator + 
"InnerJoinRightTimerState", classOf[Long])
 rightTimerState = getRuntimeContext.getState(rightTimerStateDesc)
   }
 
   /**
-* Process records from the left stream.
-*
-* @param cRowValue the input record
-* @param ctx   the context to register timer or get current time
-* @param out   the collector for outputting results
-*
+* Process rows from the left stream.
 */
   override def processElement1(
-cRowValue: CRow,
-ctx: CoProcessFunction[CRow, CRow, CRow]#Context,
-out: Collector[CRow]): Unit = {
-val timeForRecord: Long = getTimeForRecord(ctx, cRowValue, true)
-getCurrentOperatorTime(ctx)
+  cRowValue: CRow,
+  ctx: CoProcessFunction[CRow, CRow, CRow]#Context,
+  out: Collector[CRow]): Unit = {
+updateOperatorTime(ctx)
+val rowTime: Long = getTimeForLeftStream(ctx, cRowValue)
+val oppositeLowerBound: Long = rowTime - rightRelativeSize
+val oppositeUpperBound: Long = rowTime + leftRelativeSize
 processElement(
   cRowValue,
-  timeForRecord,
+  rowTime,
   ctx,
   out,
   leftOperatorTime,
+  oppositeLowerBound,
+  oppositeUpperBound,
   rightOperatorTime,
   rightTimerState,
   leftCache,
   rightCache,
-  true
+  leftRow = true
 )
   }
 
   /**
-* Process records from the right stream.
-*
-* @param cRowValue the input record
-* @param ctx   the context to get current time
-* @param out   the collector for outputting results
-*
+* Process rows from the right stream.
 */
   override def processElement2(
-cRowValue: CRow,
-ctx: CoProcessFunction[CRow, CRow, CRow]#Context,
-out: Collector[CRow]): Unit = {
-val timeForRecord: Long = getTimeForRecord(ctx, cRowValue, false)
-getCurrentOperatorTime(ctx)
+  cRowValue: CRow,
+  ctx: CoProcessFunction[CRow, CRow, CRow]#Context,
+  out: Collector[CRow]): Unit = {
+updateOperatorTime(ctx)
+val rowTime: Long = getTimeForRightStream(ctx, cRowValue)
+val oppositeLowerBound: Long = rowTime - leftRelativeSize
+val oppositeUpperBound: Long =  rowTime + rightRelativeSize
 processElement(
   cRowValue,
-  timeForRecord,
+  rowTime,
   ctx,
   out,
   rightOperatorTime,
+

[GitHub] flink pull request #4625: [FLINK-6233] [table] Support time-bounded stream i...

2017-09-19 Thread xccui
Github user xccui commented on a diff in the pull request:

https://github.com/apache/flink/pull/4625#discussion_r139611419
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TimeBoundedStreamInnerJoin.scala
 ---
@@ -131,340 +116,308 @@ class TimeBoundedStreamInnerJoin(
 // Initialize the data caches.
 val leftListTypeInfo: TypeInformation[JList[Row]] = new 
ListTypeInfo[Row](leftType)
 val leftStateDescriptor: MapStateDescriptor[Long, JList[Row]] =
-  new MapStateDescriptor[Long, JList[Row]](timeIndicator + 
"InnerJoinLeftCache",
-BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], 
leftListTypeInfo)
+  new MapStateDescriptor[Long, JList[Row]](
+timeIndicator + "InnerJoinLeftCache",
+BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]],
+leftListTypeInfo)
 leftCache = getRuntimeContext.getMapState(leftStateDescriptor)
 
 val rightListTypeInfo: TypeInformation[JList[Row]] = new 
ListTypeInfo[Row](rightType)
 val rightStateDescriptor: MapStateDescriptor[Long, JList[Row]] =
-  new MapStateDescriptor[Long, JList[Row]](timeIndicator + 
"InnerJoinRightCache",
-BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], 
rightListTypeInfo)
+  new MapStateDescriptor[Long, JList[Row]](
+timeIndicator + "InnerJoinRightCache",
+BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]],
+rightListTypeInfo)
 rightCache = getRuntimeContext.getMapState(rightStateDescriptor)
 
 // Initialize the timer states.
 val leftTimerStateDesc: ValueStateDescriptor[Long] =
-  new ValueStateDescriptor[Long](timeIndicator + 
"InnerJoinLeftTimerState",
-classOf[Long])
+  new ValueStateDescriptor[Long](timeIndicator + 
"InnerJoinLeftTimerState", classOf[Long])
 leftTimerState = getRuntimeContext.getState(leftTimerStateDesc)
 
 val rightTimerStateDesc: ValueStateDescriptor[Long] =
-  new ValueStateDescriptor[Long](timeIndicator + 
"InnerJoinRightTimerState",
-classOf[Long])
+  new ValueStateDescriptor[Long](timeIndicator + 
"InnerJoinRightTimerState", classOf[Long])
 rightTimerState = getRuntimeContext.getState(rightTimerStateDesc)
   }
 
   /**
-* Process records from the left stream.
-*
-* @param cRowValue the input record
-* @param ctx   the context to register timer or get current time
-* @param out   the collector for outputting results
-*
+* Process rows from the left stream.
 */
   override def processElement1(
-cRowValue: CRow,
-ctx: CoProcessFunction[CRow, CRow, CRow]#Context,
-out: Collector[CRow]): Unit = {
-val timeForRecord: Long = getTimeForRecord(ctx, cRowValue, true)
-getCurrentOperatorTime(ctx)
+  cRowValue: CRow,
+  ctx: CoProcessFunction[CRow, CRow, CRow]#Context,
+  out: Collector[CRow]): Unit = {
+updateOperatorTime(ctx)
+val rowTime: Long = getTimeForLeftStream(ctx, cRowValue)
+val oppositeLowerBound: Long = rowTime - rightRelativeSize
+val oppositeUpperBound: Long = rowTime + leftRelativeSize
 processElement(
   cRowValue,
-  timeForRecord,
+  rowTime,
   ctx,
   out,
   leftOperatorTime,
+  oppositeLowerBound,
+  oppositeUpperBound,
   rightOperatorTime,
   rightTimerState,
   leftCache,
   rightCache,
-  true
+  leftRow = true
 )
   }
 
   /**
-* Process records from the right stream.
-*
-* @param cRowValue the input record
-* @param ctx   the context to get current time
-* @param out   the collector for outputting results
-*
+* Process rows from the right stream.
 */
   override def processElement2(
-cRowValue: CRow,
-ctx: CoProcessFunction[CRow, CRow, CRow]#Context,
-out: Collector[CRow]): Unit = {
-val timeForRecord: Long = getTimeForRecord(ctx, cRowValue, false)
-getCurrentOperatorTime(ctx)
+  cRowValue: CRow,
+  ctx: CoProcessFunction[CRow, CRow, CRow]#Context,
+  out: Collector[CRow]): Unit = {
+updateOperatorTime(ctx)
+val rowTime: Long = getTimeForRightStream(ctx, cRowValue)
+val oppositeLowerBound: Long = rowTime - leftRelativeSize
+val oppositeUpperBound: Long =  rowTime + rightRelativeSize
 processElement(
   cRowValue,
-  timeForRecord,
+  rowTime,
   ctx,
   out,
   rightOperatorTime,
+

[GitHub] flink pull request #4625: [FLINK-6233] [table] Support time-bounded stream i...

2017-09-18 Thread xccui
Github user xccui commented on a diff in the pull request:

https://github.com/apache/flink/pull/4625#discussion_r139591493
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/RowTimeBoundedStreamInnerJoin.scala
 ---
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.join
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.streaming.api.functions.co.CoProcessFunction
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.types.Row
+
+/**
+  * The function to execute row(event) time bounded stream inner-join.
+  */
+class RowTimeBoundedStreamInnerJoin(
+leftLowerBound: Long,
+leftUpperBound: Long,
+allowedLateness: Long,
+leftType: TypeInformation[Row],
+rightType: TypeInformation[Row],
+genJoinFuncName: String,
+genJoinFuncCode: String,
+leftTimeIdx: Int,
+rightTimeIdx: Int)
+extends TimeBoundedStreamInnerJoin(
+  leftLowerBound,
+  leftUpperBound,
+  allowedLateness,
+  leftType,
+  rightType,
+  genJoinFuncName,
+  genJoinFuncCode,
+  leftTimeIdx,
+  rightTimeIdx,
+  JoinTimeIndicator.ROWTIME) {
+
+  override def checkRowOutOfDate(timeForRow: Long, watermark: Long) = {
+timeForRow <= watermark - allowedLateness
+  }
+
+  override def updateOperatorTime(ctx: CoProcessFunction[CRow, CRow, 
CRow]#Context): Unit = {
+rightOperatorTime =
+  if (ctx.timerService().currentWatermark() > 0) 
ctx.timerService().currentWatermark()
--- End diff --

Yes, you are right. I'll move this check to places where we actually use 
the watermark.


---


[GitHub] flink pull request #4625: [FLINK-6233] [table] Support time-bounded stream i...

2017-09-18 Thread xccui
Github user xccui commented on a diff in the pull request:

https://github.com/apache/flink/pull/4625#discussion_r139585456
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/ProcTimeBoundedStreamInnerJoin.scala
 ---
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.join
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.streaming.api.functions.co.CoProcessFunction
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.types.Row
+
+/**
+  * The function to execute processing time bounded stream inner-join.
+  */
+class ProcTimeBoundedStreamInnerJoin(
+leftLowerBound: Long,
+leftUpperBound: Long,
+allowedLateness: Long,
+leftType: TypeInformation[Row],
+rightType: TypeInformation[Row],
+genJoinFuncName: String,
+genJoinFuncCode: String)
+extends TimeBoundedStreamInnerJoin(
+  leftLowerBound,
+  leftUpperBound,
+  allowedLateness,
+  leftType,
+  rightType,
+  genJoinFuncName,
+  genJoinFuncCode,
+  leftTimeIdx = -1,
+  rightTimeIdx = -1,
+  JoinTimeIndicator.PROCTIME) {
+
+  override def checkRowOutOfDate(timeForRow: Long, watermark: Long) = false
+
+  override def updateOperatorTime(ctx: CoProcessFunction[CRow, CRow, 
CRow]#Context): Unit = {
+rightOperatorTime = ctx.timerService().currentProcessingTime()
+leftOperatorTime = ctx.timerService().currentProcessingTime()
+  }
+
+  override def getTimeForLeftStream(
+  context: CoProcessFunction[CRow, CRow, CRow]#Context,
+  row: CRow): Long = {
+context.timerService().currentProcessingTime()
--- End diff --

Yes, you are right. To keep them identical, we should return the 
`leftOperatorTime` here. However, this makes `updateOperatorTime` and 
`getTimeForLeftStream` coupled, i.e., `updateOperatorTime` must be invoked 
before `getTimeForLeftStream`. Can we bear this? 

I've got an idea about the processing time. How about temporarily caching 
the machine time for the same `StreamRecord` instead of invoking the 
`System.currentTimeMillis()` each time?


---


[GitHub] flink issue #4625: [FLINK-6233] [table] Support time-bounded stream inner jo...

2017-09-12 Thread xccui
Github user xccui commented on the issue:

https://github.com/apache/flink/pull/4625
  
Hi @fhueske, the PR has been updated. However, there are still some 
unfinished tasks, e.g., optimise the data caching and cleaning up policies and 
distinguish the `<` and `<=`signs. I want to leave them as future works. What 
do you think?

BTW, I find most of the recent PRs are failed on the build jobs with `TEST 
= "misc"`.  I'm not sure if there exist some problems with the CI. Here is a 
[build 
log](https://s3.amazonaws.com/archive.travis-ci.org/jobs/274508987/log.txt?X-Amz-Expires=30&X-Amz-Date=20170912T130101Z&X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Credential=AKIAJRYRXRSVGNKPKO5A/20170912/us-east-1/s3/aws4_request&X-Amz-SignedHeaders=host&X-Amz-Signature=af6d9a141ef245be9a7393e69c484785340a9bb1f1030e1039d4ed44f0344a42)
 for this PR.


---


[GitHub] flink pull request #4625: [FLINK-6233] [table] Support time-bounded stream i...

2017-09-06 Thread xccui
Github user xccui commented on a diff in the pull request:

https://github.com/apache/flink/pull/4625#discussion_r137272749
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TimeBoundedStreamInnerJoin.scala
 ---
@@ -0,0 +1,533 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.join
+
+import java.text.SimpleDateFormat
+import java.util
+import java.util.Map.Entry
+import java.util.{Date, List => JList}
+
+import org.apache.flink.api.common.functions.FlatJoinFunction
+import org.apache.flink.api.common.state._
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, 
TypeInformation}
+import org.apache.flink.api.java.typeutils.ListTypeInfo
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.co.CoProcessFunction
+import org.apache.flink.table.codegen.Compiler
+import org.apache.flink.table.runtime.CRowWrappingCollector
+import 
org.apache.flink.table.runtime.join.JoinTimeIndicator.JoinTimeIndicator
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.table.util.Logging
+import org.apache.flink.types.Row
+import org.apache.flink.util.Collector
+
+/**
+  * A CoProcessFunction to execute time-bounded stream inner-join.
+  *
+  * Sample criteria:
+  *
+  * L.time between R.time + X and R.time + Y
+  * or AND R.time between L.time - Y and L.time - X
+  *
+  * @param leftLowerBound  X
+  * @param leftUpperBound  Y
+  * @param allowedLateness the lateness allowed for the two streams
+  * @param leftTypethe input type of left stream
+  * @param rightType   the input type of right stream
+  * @param genJoinFuncName the function code of other non-equi conditions
+  * @param genJoinFuncCode the function name of other non-equi conditions
+  * @param timeIndicator   indicate whether joining on proctime or rowtime
+  *
+  */
+class TimeBoundedStreamInnerJoin(
+  private val leftLowerBound: Long,
+  private val leftUpperBound: Long,
+  private val allowedLateness: Long,
+  private val leftType: TypeInformation[Row],
+  private val rightType: TypeInformation[Row],
+  private val genJoinFuncName: String,
+  private val genJoinFuncCode: String,
+  private val leftTimeIdx: Int,
+  private val rightTimeIdx: Int,
+  private val timeIndicator: JoinTimeIndicator)
+  extends CoProcessFunction[CRow, CRow, CRow]
+with Compiler[FlatJoinFunction[Row, Row, Row]]
+with Logging {
+
+  private var cRowWrapper: CRowWrappingCollector = _
+
+  // the join function for other conditions
+  private var joinFunction: FlatJoinFunction[Row, Row, Row] = _
+
+  // cache to store the left stream records
+  private var leftCache: MapState[Long, JList[Row]] = _
+  // cache to store right stream records
+  private var rightCache: MapState[Long, JList[Row]] = _
+
+  // state to record the timer on the left stream. 0 means no timer set
+  private var leftTimerState: ValueState[Long] = _
+  // state to record the timer on the right stream. 0 means no timer set
+  private var rightTimerState: ValueState[Long] = _
+
+  private val leftRelativeSize: Long = -leftLowerBound
+  private val rightRelativeSize: Long = leftUpperBound
+
+  private val relativeWindowSize = rightRelativeSize + leftRelativeSize
+
+  private var leftOperatorTime: Long = 0L
+  private var rightOperatorTime: Long = 0L
+
+  private var backPressureSuggestion: Long = 0L
+
+  if (relativeWindowSize <= 0) {
+LOG.warn("The relative window size is non-positive, please check the 
join conditions.")
+  }
+
+  if (allowedLateness < 0) {
+throw new IllegalArgumentException("The allowed lateness must be 
non-negative.")
+  }
+
+
+  /**
+* For holding back watermarks

[GitHub] flink pull request #4625: [FLINK-6233] [table] Support time-bounded stream i...

2017-09-06 Thread xccui
Github user xccui commented on a diff in the pull request:

https://github.com/apache/flink/pull/4625#discussion_r137205581
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TimeBoundedStreamInnerJoin.scala
 ---
@@ -0,0 +1,533 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.join
+
+import java.text.SimpleDateFormat
+import java.util
+import java.util.Map.Entry
+import java.util.{Date, List => JList}
+
+import org.apache.flink.api.common.functions.FlatJoinFunction
+import org.apache.flink.api.common.state._
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, 
TypeInformation}
+import org.apache.flink.api.java.typeutils.ListTypeInfo
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.co.CoProcessFunction
+import org.apache.flink.table.codegen.Compiler
+import org.apache.flink.table.runtime.CRowWrappingCollector
+import 
org.apache.flink.table.runtime.join.JoinTimeIndicator.JoinTimeIndicator
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.table.util.Logging
+import org.apache.flink.types.Row
+import org.apache.flink.util.Collector
+
+/**
+  * A CoProcessFunction to execute time-bounded stream inner-join.
+  *
+  * Sample criteria:
+  *
+  * L.time between R.time + X and R.time + Y
+  * or AND R.time between L.time - Y and L.time - X
+  *
+  * @param leftLowerBound  X
+  * @param leftUpperBound  Y
+  * @param allowedLateness the lateness allowed for the two streams
+  * @param leftTypethe input type of left stream
+  * @param rightType   the input type of right stream
+  * @param genJoinFuncName the function code of other non-equi conditions
+  * @param genJoinFuncCode the function name of other non-equi conditions
+  * @param timeIndicator   indicate whether joining on proctime or rowtime
+  *
+  */
+class TimeBoundedStreamInnerJoin(
+  private val leftLowerBound: Long,
+  private val leftUpperBound: Long,
+  private val allowedLateness: Long,
+  private val leftType: TypeInformation[Row],
+  private val rightType: TypeInformation[Row],
+  private val genJoinFuncName: String,
+  private val genJoinFuncCode: String,
+  private val leftTimeIdx: Int,
+  private val rightTimeIdx: Int,
+  private val timeIndicator: JoinTimeIndicator)
+  extends CoProcessFunction[CRow, CRow, CRow]
+with Compiler[FlatJoinFunction[Row, Row, Row]]
+with Logging {
+
+  private var cRowWrapper: CRowWrappingCollector = _
+
+  // the join function for other conditions
+  private var joinFunction: FlatJoinFunction[Row, Row, Row] = _
+
+  // cache to store the left stream records
+  private var leftCache: MapState[Long, JList[Row]] = _
+  // cache to store right stream records
+  private var rightCache: MapState[Long, JList[Row]] = _
+
+  // state to record the timer on the left stream. 0 means no timer set
+  private var leftTimerState: ValueState[Long] = _
+  // state to record the timer on the right stream. 0 means no timer set
+  private var rightTimerState: ValueState[Long] = _
+
+  private val leftRelativeSize: Long = -leftLowerBound
+  private val rightRelativeSize: Long = leftUpperBound
+
+  private val relativeWindowSize = rightRelativeSize + leftRelativeSize
+
+  private var leftOperatorTime: Long = 0L
+  private var rightOperatorTime: Long = 0L
+
+  private var backPressureSuggestion: Long = 0L
+
+  if (relativeWindowSize <= 0) {
+LOG.warn("The relative window size is non-positive, please check the 
join conditions.")
+  }
+
+  if (allowedLateness < 0) {
+throw new IllegalArgumentException("The allowed lateness must be 
non-negative.")
+  }
+
+
+  /**
+* For holding back watermarks

[GitHub] flink pull request #4625: [FLINK-6233] [table] Support time-bounded stream i...

2017-09-06 Thread xccui
Github user xccui commented on a diff in the pull request:

https://github.com/apache/flink/pull/4625#discussion_r137201317
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TimeBoundedStreamInnerJoin.scala
 ---
@@ -0,0 +1,533 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.join
+
+import java.text.SimpleDateFormat
+import java.util
+import java.util.Map.Entry
+import java.util.{Date, List => JList}
+
+import org.apache.flink.api.common.functions.FlatJoinFunction
+import org.apache.flink.api.common.state._
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, 
TypeInformation}
+import org.apache.flink.api.java.typeutils.ListTypeInfo
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.co.CoProcessFunction
+import org.apache.flink.table.codegen.Compiler
+import org.apache.flink.table.runtime.CRowWrappingCollector
+import 
org.apache.flink.table.runtime.join.JoinTimeIndicator.JoinTimeIndicator
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.table.util.Logging
+import org.apache.flink.types.Row
+import org.apache.flink.util.Collector
+
+/**
+  * A CoProcessFunction to execute time-bounded stream inner-join.
+  *
+  * Sample criteria:
+  *
+  * L.time between R.time + X and R.time + Y
+  * or AND R.time between L.time - Y and L.time - X
+  *
+  * @param leftLowerBound  X
+  * @param leftUpperBound  Y
+  * @param allowedLateness the lateness allowed for the two streams
+  * @param leftTypethe input type of left stream
+  * @param rightType   the input type of right stream
+  * @param genJoinFuncName the function code of other non-equi conditions
+  * @param genJoinFuncCode the function name of other non-equi conditions
+  * @param timeIndicator   indicate whether joining on proctime or rowtime
+  *
+  */
+class TimeBoundedStreamInnerJoin(
+  private val leftLowerBound: Long,
+  private val leftUpperBound: Long,
+  private val allowedLateness: Long,
+  private val leftType: TypeInformation[Row],
+  private val rightType: TypeInformation[Row],
+  private val genJoinFuncName: String,
+  private val genJoinFuncCode: String,
+  private val leftTimeIdx: Int,
+  private val rightTimeIdx: Int,
+  private val timeIndicator: JoinTimeIndicator)
+  extends CoProcessFunction[CRow, CRow, CRow]
+with Compiler[FlatJoinFunction[Row, Row, Row]]
+with Logging {
+
+  private var cRowWrapper: CRowWrappingCollector = _
+
+  // the join function for other conditions
+  private var joinFunction: FlatJoinFunction[Row, Row, Row] = _
+
+  // cache to store the left stream records
+  private var leftCache: MapState[Long, JList[Row]] = _
+  // cache to store right stream records
+  private var rightCache: MapState[Long, JList[Row]] = _
+
+  // state to record the timer on the left stream. 0 means no timer set
+  private var leftTimerState: ValueState[Long] = _
+  // state to record the timer on the right stream. 0 means no timer set
+  private var rightTimerState: ValueState[Long] = _
+
+  private val leftRelativeSize: Long = -leftLowerBound
+  private val rightRelativeSize: Long = leftUpperBound
+
+  private val relativeWindowSize = rightRelativeSize + leftRelativeSize
+
+  private var leftOperatorTime: Long = 0L
+  private var rightOperatorTime: Long = 0L
+
+  private var backPressureSuggestion: Long = 0L
+
+  if (relativeWindowSize <= 0) {
+LOG.warn("The relative window size is non-positive, please check the 
join conditions.")
+  }
+
+  if (allowedLateness < 0) {
+throw new IllegalArgumentException("The allowed lateness must be 
non-negative.")
+  }
+
+
+  /**
+* For holding back watermarks

[GitHub] flink pull request #4625: [FLINK-6233] [table] Support time-bounded stream i...

2017-09-05 Thread xccui
Github user xccui commented on a diff in the pull request:

https://github.com/apache/flink/pull/4625#discussion_r137168799
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TimeBoundedStreamInnerJoin.scala
 ---
@@ -0,0 +1,533 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.join
+
+import java.text.SimpleDateFormat
+import java.util
+import java.util.Map.Entry
+import java.util.{Date, List => JList}
+
+import org.apache.flink.api.common.functions.FlatJoinFunction
+import org.apache.flink.api.common.state._
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, 
TypeInformation}
+import org.apache.flink.api.java.typeutils.ListTypeInfo
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.co.CoProcessFunction
+import org.apache.flink.table.codegen.Compiler
+import org.apache.flink.table.runtime.CRowWrappingCollector
+import 
org.apache.flink.table.runtime.join.JoinTimeIndicator.JoinTimeIndicator
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.table.util.Logging
+import org.apache.flink.types.Row
+import org.apache.flink.util.Collector
+
+/**
+  * A CoProcessFunction to execute time-bounded stream inner-join.
+  *
+  * Sample criteria:
+  *
+  * L.time between R.time + X and R.time + Y
+  * or AND R.time between L.time - Y and L.time - X
+  *
+  * @param leftLowerBound  X
+  * @param leftUpperBound  Y
+  * @param allowedLateness the lateness allowed for the two streams
+  * @param leftTypethe input type of left stream
+  * @param rightType   the input type of right stream
+  * @param genJoinFuncName the function code of other non-equi conditions
+  * @param genJoinFuncCode the function name of other non-equi conditions
+  * @param timeIndicator   indicate whether joining on proctime or rowtime
+  *
+  */
+class TimeBoundedStreamInnerJoin(
+  private val leftLowerBound: Long,
+  private val leftUpperBound: Long,
+  private val allowedLateness: Long,
+  private val leftType: TypeInformation[Row],
+  private val rightType: TypeInformation[Row],
+  private val genJoinFuncName: String,
+  private val genJoinFuncCode: String,
+  private val leftTimeIdx: Int,
+  private val rightTimeIdx: Int,
+  private val timeIndicator: JoinTimeIndicator)
+  extends CoProcessFunction[CRow, CRow, CRow]
+with Compiler[FlatJoinFunction[Row, Row, Row]]
+with Logging {
+
+  private var cRowWrapper: CRowWrappingCollector = _
+
+  // the join function for other conditions
+  private var joinFunction: FlatJoinFunction[Row, Row, Row] = _
+
+  // cache to store the left stream records
+  private var leftCache: MapState[Long, JList[Row]] = _
+  // cache to store right stream records
+  private var rightCache: MapState[Long, JList[Row]] = _
+
+  // state to record the timer on the left stream. 0 means no timer set
+  private var leftTimerState: ValueState[Long] = _
+  // state to record the timer on the right stream. 0 means no timer set
+  private var rightTimerState: ValueState[Long] = _
+
+  private val leftRelativeSize: Long = -leftLowerBound
+  private val rightRelativeSize: Long = leftUpperBound
+
+  private val relativeWindowSize = rightRelativeSize + leftRelativeSize
+
+  private var leftOperatorTime: Long = 0L
+  private var rightOperatorTime: Long = 0L
+
+  private var backPressureSuggestion: Long = 0L
+
+  if (relativeWindowSize <= 0) {
+LOG.warn("The relative window size is non-positive, please check the 
join conditions.")
+  }
+
+  if (allowedLateness < 0) {
+throw new IllegalArgumentException("The allowed lateness must be 
non-negative.")
+  }
+
+
+  /**
+* For holding back watermarks

[GitHub] flink pull request #4625: [FLINK-6233] [table] Support time-bounded stream i...

2017-09-05 Thread xccui
Github user xccui commented on a diff in the pull request:

https://github.com/apache/flink/pull/4625#discussion_r137163327
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TimeBoundedStreamInnerJoin.scala
 ---
@@ -0,0 +1,533 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.join
+
+import java.text.SimpleDateFormat
+import java.util
+import java.util.Map.Entry
+import java.util.{Date, List => JList}
+
+import org.apache.flink.api.common.functions.FlatJoinFunction
+import org.apache.flink.api.common.state._
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, 
TypeInformation}
+import org.apache.flink.api.java.typeutils.ListTypeInfo
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.co.CoProcessFunction
+import org.apache.flink.table.codegen.Compiler
+import org.apache.flink.table.runtime.CRowWrappingCollector
+import 
org.apache.flink.table.runtime.join.JoinTimeIndicator.JoinTimeIndicator
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.table.util.Logging
+import org.apache.flink.types.Row
+import org.apache.flink.util.Collector
+
+/**
+  * A CoProcessFunction to execute time-bounded stream inner-join.
+  *
+  * Sample criteria:
+  *
+  * L.time between R.time + X and R.time + Y
+  * or AND R.time between L.time - Y and L.time - X
+  *
+  * @param leftLowerBound  X
+  * @param leftUpperBound  Y
+  * @param allowedLateness the lateness allowed for the two streams
+  * @param leftTypethe input type of left stream
+  * @param rightType   the input type of right stream
+  * @param genJoinFuncName the function code of other non-equi conditions
+  * @param genJoinFuncCode the function name of other non-equi conditions
+  * @param timeIndicator   indicate whether joining on proctime or rowtime
+  *
+  */
+class TimeBoundedStreamInnerJoin(
+  private val leftLowerBound: Long,
+  private val leftUpperBound: Long,
+  private val allowedLateness: Long,
+  private val leftType: TypeInformation[Row],
+  private val rightType: TypeInformation[Row],
+  private val genJoinFuncName: String,
+  private val genJoinFuncCode: String,
+  private val leftTimeIdx: Int,
+  private val rightTimeIdx: Int,
+  private val timeIndicator: JoinTimeIndicator)
+  extends CoProcessFunction[CRow, CRow, CRow]
+with Compiler[FlatJoinFunction[Row, Row, Row]]
+with Logging {
+
+  private var cRowWrapper: CRowWrappingCollector = _
+
+  // the join function for other conditions
+  private var joinFunction: FlatJoinFunction[Row, Row, Row] = _
+
+  // cache to store the left stream records
+  private var leftCache: MapState[Long, JList[Row]] = _
+  // cache to store right stream records
+  private var rightCache: MapState[Long, JList[Row]] = _
+
+  // state to record the timer on the left stream. 0 means no timer set
+  private var leftTimerState: ValueState[Long] = _
+  // state to record the timer on the right stream. 0 means no timer set
+  private var rightTimerState: ValueState[Long] = _
+
+  private val leftRelativeSize: Long = -leftLowerBound
+  private val rightRelativeSize: Long = leftUpperBound
+
+  private val relativeWindowSize = rightRelativeSize + leftRelativeSize
+
+  private var leftOperatorTime: Long = 0L
+  private var rightOperatorTime: Long = 0L
+
+  private var backPressureSuggestion: Long = 0L
+
+  if (relativeWindowSize <= 0) {
+LOG.warn("The relative window size is non-positive, please check the 
join conditions.")
+  }
+
+  if (allowedLateness < 0) {
+throw new IllegalArgumentException("The allowed lateness must be 
non-negative.")
+  }
+
+
+  /**
+* For holding back watermarks

[GitHub] flink pull request #4625: [FLINK-6233] [table] Support time-bounded stream i...

2017-09-05 Thread xccui
Github user xccui commented on a diff in the pull request:

https://github.com/apache/flink/pull/4625#discussion_r137158818
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TimeBoundedStreamInnerJoin.scala
 ---
@@ -0,0 +1,533 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.join
+
+import java.text.SimpleDateFormat
+import java.util
+import java.util.Map.Entry
+import java.util.{Date, List => JList}
+
+import org.apache.flink.api.common.functions.FlatJoinFunction
+import org.apache.flink.api.common.state._
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, 
TypeInformation}
+import org.apache.flink.api.java.typeutils.ListTypeInfo
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.co.CoProcessFunction
+import org.apache.flink.table.codegen.Compiler
+import org.apache.flink.table.runtime.CRowWrappingCollector
+import 
org.apache.flink.table.runtime.join.JoinTimeIndicator.JoinTimeIndicator
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.table.util.Logging
+import org.apache.flink.types.Row
+import org.apache.flink.util.Collector
+
+/**
+  * A CoProcessFunction to execute time-bounded stream inner-join.
+  *
+  * Sample criteria:
+  *
+  * L.time between R.time + X and R.time + Y
+  * or AND R.time between L.time - Y and L.time - X
+  *
+  * @param leftLowerBound  X
+  * @param leftUpperBound  Y
+  * @param allowedLateness the lateness allowed for the two streams
+  * @param leftTypethe input type of left stream
+  * @param rightType   the input type of right stream
+  * @param genJoinFuncName the function code of other non-equi conditions
+  * @param genJoinFuncCode the function name of other non-equi conditions
+  * @param timeIndicator   indicate whether joining on proctime or rowtime
+  *
+  */
+class TimeBoundedStreamInnerJoin(
+  private val leftLowerBound: Long,
+  private val leftUpperBound: Long,
+  private val allowedLateness: Long,
+  private val leftType: TypeInformation[Row],
+  private val rightType: TypeInformation[Row],
+  private val genJoinFuncName: String,
+  private val genJoinFuncCode: String,
+  private val leftTimeIdx: Int,
+  private val rightTimeIdx: Int,
+  private val timeIndicator: JoinTimeIndicator)
+  extends CoProcessFunction[CRow, CRow, CRow]
+with Compiler[FlatJoinFunction[Row, Row, Row]]
+with Logging {
+
+  private var cRowWrapper: CRowWrappingCollector = _
+
+  // the join function for other conditions
+  private var joinFunction: FlatJoinFunction[Row, Row, Row] = _
+
+  // cache to store the left stream records
+  private var leftCache: MapState[Long, JList[Row]] = _
+  // cache to store right stream records
+  private var rightCache: MapState[Long, JList[Row]] = _
+
+  // state to record the timer on the left stream. 0 means no timer set
+  private var leftTimerState: ValueState[Long] = _
+  // state to record the timer on the right stream. 0 means no timer set
+  private var rightTimerState: ValueState[Long] = _
+
+  private val leftRelativeSize: Long = -leftLowerBound
+  private val rightRelativeSize: Long = leftUpperBound
+
+  private val relativeWindowSize = rightRelativeSize + leftRelativeSize
+
+  private var leftOperatorTime: Long = 0L
+  private var rightOperatorTime: Long = 0L
+
+  private var backPressureSuggestion: Long = 0L
--- End diff --

This variable is used to store a suggestion value for performing 
backpressure *in the future*. We could cache less records if one of the stream 
is held back with this suggestion. It's just like moving the cache from Flink 
to upstream components (e.g., Kafka).


---


[GitHub] flink pull request #4625: [FLINK-6233] [table] Support time-bounded stream i...

2017-09-05 Thread xccui
Github user xccui commented on a diff in the pull request:

https://github.com/apache/flink/pull/4625#discussion_r137150128
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/WindowJoinUtil.scala
 ---
@@ -115,10 +118,15 @@ object WindowJoinUtil {
   case _ =>
 Some(otherPreds.reduceLeft((l, r) => 
RelOptUtil.andJoinFilters(rexBuilder, l, r)))
 }
-
-val bounds = Some(WindowBounds(timePreds.head.isEventTime, 
leftLowerBound, leftUpperBound))
-
-(bounds, remainCondition)
+if (timePreds.head.leftInputOnLeftSide) {
--- End diff --

We should also subtract the `leftLogicalFieldCnt` for the later index?


---


[GitHub] flink pull request #4625: [FLINK-6233] [table] Support time-bounded stream i...

2017-09-05 Thread xccui
Github user xccui commented on a diff in the pull request:

https://github.com/apache/flink/pull/4625#discussion_r137146303
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamWindowJoin.scala
 ---
@@ -184,4 +195,54 @@ class DataStreamWindowJoin(
 .returns(returnTypeInfo)
 }
   }
+
+  def createRowTimeInnerJoinFunction(
+leftDataStream: DataStream[CRow],
+rightDataStream: DataStream[CRow],
+joinFunctionName: String,
+joinFunctionCode: String,
+leftKeys: Array[Int],
+rightKeys: Array[Int]): DataStream[CRow] = {
+
+val returnTypeInfo = CRowTypeInfo(schema.typeInfo)
+
+val rowTimeInnerJoinFunc = new TimeBoundedStreamInnerJoin(
+  leftLowerBound,
+  leftUpperBound,
+  0L,
+  leftSchema.typeInfo,
+  rightSchema.typeInfo,
+  joinFunctionName,
+  joinFunctionCode,
+  leftTimeIdx,
+  rightTimeIdx,
+  JoinTimeIndicator.ROWTIME
+)
+
+if (!leftKeys.isEmpty) {
+  leftDataStream
+.connect(rightDataStream)
+.keyBy(leftKeys, rightKeys)
+.transform(
+  "rowTimeInnerJoinFunc",
+  returnTypeInfo,
+  new KeyedCoProcessOperatorWithWatermarkDelay[CRow, CRow, CRow, 
CRow](
+rowTimeInnerJoinFunc,
+rowTimeInnerJoinFunc.getMaxOutputDelay)
--- End diff --

I think the "watermark delay" is considered from the operator level while 
the "output delay" is named from the function level. So how about keep this 
name?


---


[GitHub] flink pull request #4625: [FLINK-6233] [table] Support time-bounded stream i...

2017-09-05 Thread xccui
Github user xccui commented on a diff in the pull request:

https://github.com/apache/flink/pull/4625#discussion_r137144634
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamWindowJoin.scala
 ---
@@ -184,4 +195,54 @@ class DataStreamWindowJoin(
 .returns(returnTypeInfo)
 }
   }
+
+  def createRowTimeInnerJoinFunction(
+leftDataStream: DataStream[CRow],
+rightDataStream: DataStream[CRow],
+joinFunctionName: String,
+joinFunctionCode: String,
+leftKeys: Array[Int],
+rightKeys: Array[Int]): DataStream[CRow] = {
+
+val returnTypeInfo = CRowTypeInfo(schema.typeInfo)
+
+val rowTimeInnerJoinFunc = new TimeBoundedStreamInnerJoin(
+  leftLowerBound,
+  leftUpperBound,
+  0L,
+  leftSchema.typeInfo,
+  rightSchema.typeInfo,
+  joinFunctionName,
+  joinFunctionCode,
+  leftTimeIdx,
+  rightTimeIdx,
+  JoinTimeIndicator.ROWTIME
+)
+
+if (!leftKeys.isEmpty) {
+  leftDataStream
+.connect(rightDataStream)
+.keyBy(leftKeys, rightKeys)
+.transform(
+  "rowTimeInnerJoinFunc",
--- End diff --

I'd like to call this kind of join "time-bounded join" instead of "window 
join". When referring to window join, the users may think of tumbling-window or 
sliding-window, while they are actually not the same. However, as the 
“window-join” name has been widely used, I can also accept it. Do you have 
any idea about that?


---


[GitHub] flink pull request #4625: [FLINK-6233] [table] Support time-bounded stream i...

2017-09-05 Thread xccui
Github user xccui commented on a diff in the pull request:

https://github.com/apache/flink/pull/4625#discussion_r137137915
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamWindowJoinRule.scala
 ---
@@ -55,8 +55,10 @@ class DataStreamWindowJoinRule
 
 if (windowBounds.isDefined) {
   if (windowBounds.get.isEventTime) {
-// we cannot handle event-time window joins yet
-false
+val procTimeAttrInOutput = join.getRowType.getFieldList.asScala
+  .exists(f => FlinkTypeFactory.isProctimeIndicatorType(f.getType))
+
+!remainingPredsAccessTime && !procTimeAttrInOutput
--- End diff --

Shall we also keep the rowtime attributes in the outputs of proctime join?


---


[GitHub] flink issue #4625: [FLINK-6233] [table] Support time-bounded stream inner jo...

2017-09-05 Thread xccui
Github user xccui commented on the issue:

https://github.com/apache/flink/pull/4625
  
Thanks for the review, @fhueske. This PR is a little rough when I 
committed. I'll address your comments and submit a refined version as soon as 
possible.

Best, Xingcan


---


[GitHub] flink issue #4633: [FLINK-7564] [table] Fix Watermark semantics in RowTimeUn...

2017-09-05 Thread xccui
Github user xccui commented on the issue:

https://github.com/apache/flink/pull/4633
  
I see. Thanks :-)


---


[GitHub] flink issue #4633: [FLINK-7564] [table] Fix Watermark semantics in RowTimeUn...

2017-09-05 Thread xccui
Github user xccui commented on the issue:

https://github.com/apache/flink/pull/4633
  
Thanks for the review, @fhueske. I tried to consolidate the logics for 
proctime and rowtime watermark processing, but failed. That's because when 
using proctime, the "watermark" (not sure if this concept exists for proctime) 
for triggering the timers is set to be identical with the current record's 
processing time, thus all the records will be considered late. Do you think 
it's necessary to unify their behaviors, or we just take the proctime as a 
special case?


---


[GitHub] flink pull request #4633: [FLINK-7564] [table] Fix Watermark semantics in Ro...

2017-09-01 Thread xccui
GitHub user xccui opened a pull request:

https://github.com/apache/flink/pull/4633

[FLINK-7564] [table] Fix Watermark semantics in RowTimeUnboundedOver

## What is the purpose of the change

This PR aims to fix the watermark boundary check problem (mentioned in 
[this 
thread](https://lists.apache.org/thread.html/3541e72ba3842192e58a487e54c2817f6b2b9d12af5fee97af83e5df@%3Cdev.flink.apache.org%3E.))
 in Table API.


## Brief change log

- Change the delay-condition for rows in RowTimeUnboundedOver to <= 
watermark.
- Add some tests for late data in OverWindowHarnessTest.


## Verifying this change

This change is already covered by OverWindowHarnessTest.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (N/A)



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/xccui/flink FLINK-7564

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4633.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4633


commit 5312ac421384436d7170d8f1c2e00aba9e670044
Author: Xingcan Cui 
Date:   2017-09-01T01:16:21Z

[FLINK-7564] [table] Fix Watermark semantics in Table API




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4625: [FLINK-6233] [table] Support time-bounded stream i...

2017-08-30 Thread xccui
GitHub user xccui opened a pull request:

https://github.com/apache/flink/pull/4625

[FLINK-6233] [table] Support time-bounded stream inner join in the SQL API

## What is the purpose of the change

This PR aims add an implementation of the time-bounded stream inner join 
for both proctime and rowtime in the SQL API. For example, ``SELECT * from L, R 
WHERE L.pid = R.pid AND L.time between R.time + X and R.time + Y``. A design 
document for this problem can be found [here](http://goo.gl/VW5Gpd).


## Brief change log

  - I fill the missing part of the compiling stage for the rowtime stream 
inner join.
  - Some logics are added to the `WindowJoinUtil` to extract the rowtime 
indices.
  - A general `TimeBoundedStreamInnerJoin` is provided.
  - To test the new join function, I add a `TimeBoundedJoinExample` and 
some new tests to the `JoinHarnessTest`.

## Verifying this change

This change added tests to the existing JoinHarnessTest.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (**no**)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (**no**)
  - The serializers: (**no**)
  - The runtime per-record code paths (performance sensitive): (**no**)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (**no**)

## Documentation

  - Does this pull request introduce a new feature? (**yes**)
  - If yes, how is the feature documented? (**not documented yet**)

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/xccui/flink FLINK-6233

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4625.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4625


commit c79588b134a1270956a6d32b7a0a13ff4e3f483d
Author: Xingcan Cui 
Date:   2017-08-30T05:57:38Z

[FLINK-6233] [table] Support rowtime inner equi-join between two streams in 
the SQL API




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #4530: [FLINK-7245] [stream] Support holding back watermarks wit...

2017-08-29 Thread xccui
Github user xccui commented on the issue:

https://github.com/apache/flink/pull/4530
  
I totally understand the choice, @fhueske 😄 
Thanks for the refactoring. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #4530: [FLINK-7245] [stream] Support holding back watermarks wit...

2017-08-18 Thread xccui
Github user xccui commented on the issue:

https://github.com/apache/flink/pull/4530
  
Thanks for the comment, @aljoscha. IMO, making the `timeServiceManager` 
protected indeed will minimise the impact on `AbstractStreamOperator`, while 
that may introduce duplicated codes in the subclasses. We make some trade-offs 
here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #4530: [FLINK-7245] [stream] Support holding back watermarks wit...

2017-08-17 Thread xccui
Github user xccui commented on the issue:

https://github.com/apache/flink/pull/4530
  
Thanks for the comments @fhueske. I will pay more attention to the coding 
style. 

Actually, there are many ways to implement this feature. At first, I planed 
to override the `processWatermark` method in the sub-class. However, the 
instance variable `timeServiceManager` needed is declared as private. I'm not 
sure if this can be changed. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #4530: [FLINK-7245] [stream] Support holding back watermarks wit...

2017-08-14 Thread xccui
Github user xccui commented on the issue:

https://github.com/apache/flink/pull/4530
  
@fhueske Yes, the plural is better. I should have noticed that before. 
This PR is updated with the new package name and an extra delay parameter 
added to the co-operator.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #4530: [FLINK-7245] [stream] Support holding back watermarks wit...

2017-08-14 Thread xccui
Github user xccui commented on the issue:

https://github.com/apache/flink/pull/4530
  
Thanks for the suggestion, @aljoscha. Do you think it's appropriate to add 
a new package `org.apache.flink.table.runtime.operator`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4532: [FLINK-7337] [table] Refactor internal handling of...

2017-08-13 Thread xccui
Github user xccui commented on a diff in the pull request:

https://github.com/apache/flink/pull/4532#discussion_r132842811
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
 ---
@@ -719,33 +715,47 @@ abstract class TableEnvironment(val config: 
TableConfig) {
   throw new TableException("Field name can not be '*'.")
 }
 
-(fieldNames.toArray, fieldIndexes.toArray)
+(fieldNames.toArray, fieldIndexes.toArray) // build fails if not 
converted to array
--- End diff --

In my local environment, `toArray` also seems to be redundant.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4532: [FLINK-7337] [table] Refactor internal handling of...

2017-08-12 Thread xccui
Github user xccui commented on a diff in the pull request:

https://github.com/apache/flink/pull/4532#discussion_r132833668
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/OutputRowtimeProcessFunction.scala
 ---
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime
+
+import org.apache.calcite.runtime.SqlFunctions
+import org.apache.flink.api.common.functions.{MapFunction, RichMapFunction}
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.flink.streaming.api.operators.TimestampedCollector
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.util.Collector
+
+/**
+  * Wraps a ProcessFunction and sets a Timestamp field of a CRow as
+  * [[org.apache.flink.streaming.runtime.streamrecord.StreamRecord]] 
timestamp.
+  */
+class OutputRowtimeProcessFunction[OUT](
+function: MapFunction[CRow, OUT],
+rowtimeIdx: Int)
--- End diff --

It seems that this function only changes the data type of the rowtime field 
from Long to Timestamp. Shall we consider making the `rowtimeIdx` an array? 
Besides, as @wuchong suggested, I also think a query should keep the data type 
unchanged.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4532: [FLINK-7337] [table] Refactor internal handling of...

2017-08-12 Thread xccui
Github user xccui commented on a diff in the pull request:

https://github.com/apache/flink/pull/4532#discussion_r132820853
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
 ---
@@ -667,30 +719,62 @@ abstract class StreamTableEnvironment(
 // get CRow plan
 val plan: DataStream[CRow] = translateToCRow(logicalPlan, queryConfig)
 
+val rowtimeFields = logicalType
+  .getFieldList.asScala
+  .filter(f => FlinkTypeFactory.isRowtimeIndicatorType(f.getType))
+
+// convert the input type for the conversion mapper
+// the input will be changed in the OutputRowtimeProcessFunction later
+val convType = if (rowtimeFields.size > 1) {
+  throw new TableException(
--- End diff --

I got an idea, but not sure if it's applicable. We allow multiple rowtime 
fields in a stream but only activate one in an operator. Since the timestamps 
are stored in records, the other inactive rowtime fields can just be taken as 
common fields. Any changes on the rowtime fields will render them invalid for 
rowtime use. IMO, there are not too many queries (maybe only over aggregate and 
join) depending on the rowtime, thus the optimizer may be able to deduce which 
rowtime field should be activated in an operator. However, some existing logics 
may be affected by that.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4532: [FLINK-7337] [table] Refactor internal handling of...

2017-08-12 Thread xccui
Github user xccui commented on a diff in the pull request:

https://github.com/apache/flink/pull/4532#discussion_r132817252
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
 ---
@@ -667,30 +719,62 @@ abstract class StreamTableEnvironment(
 // get CRow plan
 val plan: DataStream[CRow] = translateToCRow(logicalPlan, queryConfig)
 
+val rowtimeFields = logicalType
+  .getFieldList.asScala
+  .filter(f => FlinkTypeFactory.isRowtimeIndicatorType(f.getType))
+
+// convert the input type for the conversion mapper
+// the input will be changed in the OutputRowtimeProcessFunction later
+val convType = if (rowtimeFields.size > 1) {
+  throw new TableException(
--- End diff --

Thanks for the PR, @fhueske and @twalthr . I tried to rebase my rowtime 
join codes on this branch, but encountered this exception. The test SQL is 
`SELECT * FROM OrderA, OrderB WHERE OrderA.productA = OrderB.productB AND 
OrderB.rtB BETWEEN OrderA.rtA AND OrderA.rtA + INTERVAL '2' SECOND`. What 
should I do to *cast all other fields to TIMESTAMP*.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4530: [FLINK-7245] [stream] Support holding back waterma...

2017-08-11 Thread xccui
GitHub user xccui opened a pull request:

https://github.com/apache/flink/pull/4530

[FLINK-7245] [stream] Support holding back watermarks with static delays

## What is the purpose of the change

*This pull request aims to allow the operators to support holding back 
watermarks with **static** delays.*


## Brief change log

  - *Introduce a new method `getWatermarkToEmit(Watermark inputWatermark)`, 
which allows to generate a new watermark with different timestamp before 
emitting it.*
  - *Add two operators `KeyedProcessOperatorWithWatermarkDelay` and 
`KeyedCoProcessOperatorWithWatermarkDelay` that support holding back watermarks 
with static delays.*

## Verifying this change

This change  is verified by two new test classes 
`KeyedProcessOperatorWithWatermarkDelayTest` and 
`KeyedCoProcessOperatorWithWatermarkDelayTest`. They test whether watermarks 
received by the two added operators can be held back with the given delays and 
the provided delays are non-negative.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (**no**)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (**no**)
  - The serializers: (**no**)
  - The runtime per-record code paths (performance sensitive): (**no**)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (**no**)

## Documentation

  - Does this pull request introduce a new feature? (**yes**)
  - If yes, how is the feature documented? (**JavaDocs**)

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/xccui/flink FLINK-7245

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4530.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4530


commit a24a11522af54c547d014d30adbefa23997d0f8d
Author: Xingcan Cui 
Date:   2017-08-09T12:54:16Z

[FLINK-7245] [stream] Support holding back watermarks with static delays

commit f730ab45c88f8bcbc27e411901e27dee84aa26b2
Author: Xingcan Cui 
Date:   2017-08-11T16:15:53Z

Refine codes




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4297: [FLINK-6936] [streaming] Add multiple targets supp...

2017-07-11 Thread xccui
GitHub user xccui opened a pull request:

https://github.com/apache/flink/pull/4297

[FLINK-6936] [streaming] Add multiple targets support for custom partitioner

Thanks for contributing to Apache Flink. Before you open your pull request, 
please take the following check list into consideration.
If your changes take all of the items into account, feel free to open your 
pull request. For more information and/or questions please refer to the [How To 
Contribute guide](http://flink.apache.org/how-to-contribute.html).
In addition to going through the list, please provide a meaningful 
description of your changes.

- [x] General
  - The pull request references the related JIRA issue ("[FLINK-XXX] Jira 
title text")
  - The pull request addresses only one issue
  - Each commit in the PR has a meaningful commit message (including the 
JIRA id)

- [x] Documentation
  - Documentation has been added for new functionality
  - Old documentation affected by the pull request has been updated
  - JavaDoc for public methods has been added

- [x] Tests & Build
  - Functionality added by the pull request is covered by tests
  - `mvn clean verify` has been executed successfully locally or a Travis 
build has passed


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/xccui/flink FLINK-6936

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4297.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4297


commit 754a8ad7ae316a09edc2bf0ea957006aee02b595
Author: Xingcan Cui 
Date:   2017-07-07T12:25:39Z

[FLINK-6936] [streaming] Add multiple targets support for custom partitioner




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3768: [FLINK-6368][table] Grouping keys in stream aggreg...

2017-04-25 Thread xccui
GitHub user xccui opened a pull request:

https://github.com/apache/flink/pull/3768

[FLINK-6368][table] Grouping keys in stream aggregations have wrong order

​FLINK-5768 removed the `AggregateUtil.createPrepareMapFunction` stage, 
who maps all grouping keys to the first n fields of a record. That's why in old 
versions we generated new shifted grouping keys (`val groupingKeys = 
grouping.indices.toArray`) by the original keys' indices. Now that the mapping 
has been removed, we should use the original grouping keys rather than the 
shifted keys. Also, a test method posted in 
https://issues.apache.org/jira/browse/FLINK-6368 is added to 
DataStreamAggregateITCase.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/xccui/flink FLINK-6368

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3768.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3768


commit ed03570d9bfa52e634de5a13b3425a5fd21fe6c8
Author: xccui 
Date:   2017-04-25T06:06:45Z

[FLINK-6368] Fix the wrong ordered keys problem




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3284: [FLINK-1526] [Gelly] Add Minimum Spanning Tree lib...

2017-04-24 Thread xccui
Github user xccui closed the pull request at:

https://github.com/apache/flink/pull/3284


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3284: [FLINK-1526] [Gelly] Add Minimum Spanning Tree library me...

2017-02-21 Thread xccui
Github user xccui commented on the issue:

https://github.com/apache/flink/pull/3284
  
With the present situation (for gelly), shall I close this PR?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3284: [FLINK-1526] [Gelly] Add Minimum Spanning Tree lib...

2017-02-09 Thread xccui
Github user xccui commented on a diff in the pull request:

https://github.com/apache/flink/pull/3284#discussion_r100455274
  
--- Diff: docs/dev/libs/gelly/library_methods.md ---
@@ -242,6 +242,28 @@ The algorithm takes a directed, vertex (and possibly 
edge) attributed graph as i
 vertex represents a group of vertices and each edge represents a group of 
edges from the input graph. Furthermore, each
 vertex and edge in the output graph stores the common group value and the 
number of represented elements.
 
+## Minimum Spanning Tree
+
+ Overview
+This is an implementation of the distributed minimum spanning tree (MST) 
algorithm. A minimum spanning tree for a connected and
+undirected graph is a subset of edges with minimum possible total edge 
weight that connects all the vertices without cycles. 
+One of the possible use cases for MST could be laying out cables that 
connect all buildings with the minimum total cable length.
+
+ Details
+Unlike a sequential version of the algorithm, a distributed MST algorithm 
is based on the message-passing model. 
+We use [vertex-centric iterations](#vertex-centric-iterations) to 
implement the Borůvka algorithm described in 
+[this paper](http://ieeexplore.ieee.org/abstract/document/508073/). As 
there are different steps inside the iteration,
--- End diff --

Yes greg, you are right. A pdf is surely better than a web site. Think it 
over, I want to change the paper to 
https://cs.uwaterloo.ca/~kdaudjee/comparison.pdf since it fits more.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3284: [FLINK-1526] [Gelly] Add Minimum Spanning Tree lib...

2017-02-07 Thread xccui
GitHub user xccui opened a pull request:

https://github.com/apache/flink/pull/3284

[FLINK-1526] [Gelly] Add Minimum Spanning Tree library method

Thanks for contributing to Apache Flink. Before you open your pull request, 
please take the following check list into consideration.
If your changes take all of the items into account, feel free to open your 
pull request. For more information and/or questions please refer to the [How To 
Contribute guide](http://flink.apache.org/how-to-contribute.html).
In addition to going through the list, please provide a meaningful 
description of your changes.

- [X] General
  - The pull request references the related JIRA issue ("[FLINK-XXX] Jira 
title text")
  - The pull request addresses only one issue
  - Each commit in the PR has a meaningful commit message (including the 
JIRA id)

- [X] Documentation
  - Documentation has been added for new functionality
  - Old documentation affected by the pull request has been updated
  - JavaDoc for public methods has been added

- [ ] Tests & Build
  - Functionality added by the pull request is covered by tests
  - `mvn clean verify` has been executed successfully locally or a Travis 
build has passed


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/xccui/flink master

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3284.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3284


commit 495aa623dccecfba66124b44c7dd955b61853c94
Author: xccui 
Date:   2017-02-08T04:57:21Z

[FLINK-1526] [Gelly] Add Minimum Spanning Tree library method




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


<    1   2   3