[GitHub] flink pull request #4380: Time sort with offset/fetch without retraction

2017-07-27 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4380#discussion_r129837121
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeIdentitySortProcessFunctionOffsetFetch.scala
 ---
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.aggregate
+
+import org.apache.flink.api.common.state.{ ListState, ListStateDescriptor }
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.runtime.state.{ FunctionInitializationContext, 
FunctionSnapshotContext }
+import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.flink.types.Row
+import org.apache.flink.util.{ Collector, Preconditions }
+import org.apache.flink.api.common.state.ValueState
+import org.apache.flink.api.common.state.ValueStateDescriptor
+import scala.util.control.Breaks._
+import org.apache.flink.api.java.tuple.{ Tuple2 => JTuple2 }
+import org.apache.flink.api.common.state.MapState
+import org.apache.flink.api.common.state.MapStateDescriptor
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.ListTypeInfo
+import java.util.Comparator
+import java.util.ArrayList
+import java.util.Collections
+import org.apache.flink.api.common.typeutils.TypeComparator
+import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
+
+/**
+ * Process Function used for the sort based solely on proctime with 
offset/fetch
+ * [[org.apache.flink.streaming.api.datastream.DataStream]]
+ *
+ * @param offset Is used to indicate the number of elements to be skipped 
in the current context
+ * (0 offset allows to execute only fetch)
+ * @param fetch Is used to indicate the number of elements to be outputted 
in the current context
+ * @param inputType It is used to mark the type of the incoming data
+ */
+class ProcTimeIdentitySortProcessFunctionOffsetFetch(
+  private val offset: Int,
+  private val fetch: Int,
+  private val inputRowType: CRowTypeInfo)
+extends ProcessFunction[CRow, CRow] {
+
+  private var stateEventsBuffer: ListState[Row] = _
+  
+  private var outputC: CRow = _
+  private val adjustedFetchLimit = offset + fetch
+  
+  override def open(config: Configuration) {
+val sortDescriptor = new ListStateDescriptor[Row]("sortState",
+inputRowType.asInstanceOf[CRowTypeInfo].rowType)
+stateEventsBuffer = getRuntimeContext.getListState(sortDescriptor)
+
+val arity:Integer = inputRowType.getArity
+if (outputC == null) {
+  outputC = new CRow(Row.of(arity), true)
+}
+
+  }
+
+  override def processElement(
--- End diff --

This operator should be implemented as follows:

```
val fetchCnt = fetchCount.value
if (fetch == -1 || fetchCnt < fetch) {
  // we haven't fetched enough rows
  val offsetCnt = offsetCount.value
  if (offsetCnt < offset) {
// we haven't skipped enough rows
// increment counter and skip row
offsetCount.update(offsetCnt + 1)
  } else {
   // forward row
   out.collect(inputC)
   if (fetchCnt != -1) {
 fetchCount.update(fetchCnt + 1)
   }
  }
} else {
  // we fetch enough rows. drop Row and return
}
```

As you'll notice `ORDER BY proctime ASC FETCH x ROWS FIRST` is quite 
pointless because it will only emit x rows than nothing more. However, that's 
the correct semantics here. `OFFSET` is similar because it won't emit the first 
x rows which is not really meaningful either in a streaming context.

The other combinations are basically the same. The only difference is that 
they do a bit more sorting to identify the rows that have to be dropped. The 
sorting operators have to do the sorting as before in `onTimer()` but each 

[GitHub] flink pull request #4380: Time sort with offset/fetch without retraction

2017-07-27 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4380#discussion_r129833188
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeIdentitySortProcessFunctionOffsetFetch.scala
 ---
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.aggregate
+
+import org.apache.flink.api.common.state.{ ListState, ListStateDescriptor }
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.runtime.state.{ FunctionInitializationContext, 
FunctionSnapshotContext }
+import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.flink.types.Row
+import org.apache.flink.util.{ Collector, Preconditions }
+import org.apache.flink.api.common.state.ValueState
+import org.apache.flink.api.common.state.ValueStateDescriptor
+import scala.util.control.Breaks._
+import org.apache.flink.api.java.tuple.{ Tuple2 => JTuple2 }
+import org.apache.flink.api.common.state.MapState
+import org.apache.flink.api.common.state.MapStateDescriptor
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.ListTypeInfo
+import java.util.Comparator
+import java.util.ArrayList
+import java.util.Collections
+import org.apache.flink.api.common.typeutils.TypeComparator
+import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
+
+/**
+ * Process Function used for the sort based solely on proctime with 
offset/fetch
+ * [[org.apache.flink.streaming.api.datastream.DataStream]]
+ *
+ * @param offset Is used to indicate the number of elements to be skipped 
in the current context
+ * (0 offset allows to execute only fetch)
+ * @param fetch Is used to indicate the number of elements to be outputted 
in the current context
+ * @param inputType It is used to mark the type of the incoming data
+ */
+class ProcTimeIdentitySortProcessFunctionOffsetFetch(
+  private val offset: Int,
+  private val fetch: Int,
+  private val inputRowType: CRowTypeInfo)
+extends ProcessFunction[CRow, CRow] {
+
+  private var stateEventsBuffer: ListState[Row] = _
+  
+  private var outputC: CRow = _
+  private val adjustedFetchLimit = offset + fetch
+  
+  override def open(config: Configuration) {
+val sortDescriptor = new ListStateDescriptor[Row]("sortState",
--- End diff --

We don't need state to collect records for this operator.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4380: Time sort with offset/fetch without retraction

2017-07-27 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4380#discussion_r129821546
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamSort.scala
 ---
@@ -108,28 +108,25 @@ class DataStreamSort(
 case _ if FlinkTypeFactory.isProctimeIndicatorType(timeType)  =>
 (sortOffset, sortFetch) match {
--- End diff --

change `sortOffset` and `sortFetch` member fields to `Option[RexNode]` to 
avoid `null`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4380: Time sort with offset/fetch without retraction

2017-07-27 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4380#discussion_r129833609
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeIdentitySortProcessFunctionOffsetFetch.scala
 ---
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.aggregate
+
+import org.apache.flink.api.common.state.{ ListState, ListStateDescriptor }
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.runtime.state.{ FunctionInitializationContext, 
FunctionSnapshotContext }
+import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.flink.types.Row
+import org.apache.flink.util.{ Collector, Preconditions }
+import org.apache.flink.api.common.state.ValueState
+import org.apache.flink.api.common.state.ValueStateDescriptor
+import scala.util.control.Breaks._
+import org.apache.flink.api.java.tuple.{ Tuple2 => JTuple2 }
+import org.apache.flink.api.common.state.MapState
+import org.apache.flink.api.common.state.MapStateDescriptor
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.ListTypeInfo
+import java.util.Comparator
+import java.util.ArrayList
+import java.util.Collections
+import org.apache.flink.api.common.typeutils.TypeComparator
+import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
+
+/**
+ * Process Function used for the sort based solely on proctime with 
offset/fetch
+ * [[org.apache.flink.streaming.api.datastream.DataStream]]
+ *
+ * @param offset Is used to indicate the number of elements to be skipped 
in the current context
+ * (0 offset allows to execute only fetch)
+ * @param fetch Is used to indicate the number of elements to be outputted 
in the current context
+ * @param inputType It is used to mark the type of the incoming data
+ */
+class ProcTimeIdentitySortProcessFunctionOffsetFetch(
+  private val offset: Int,
+  private val fetch: Int,
+  private val inputRowType: CRowTypeInfo)
+extends ProcessFunction[CRow, CRow] {
+
+  private var stateEventsBuffer: ListState[Row] = _
+  
+  private var outputC: CRow = _
+  private val adjustedFetchLimit = offset + fetch
+  
+  override def open(config: Configuration) {
+val sortDescriptor = new ListStateDescriptor[Row]("sortState",
--- End diff --

Instead we need two `ValueState[Long]` for `offsetCount` and `fetchCount`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4380: Time sort with offset/fetch without retraction

2017-07-27 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4380#discussion_r129833309
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeIdentitySortProcessFunctionOffsetFetch.scala
 ---
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.aggregate
+
+import org.apache.flink.api.common.state.{ ListState, ListStateDescriptor }
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.runtime.state.{ FunctionInitializationContext, 
FunctionSnapshotContext }
+import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.flink.types.Row
+import org.apache.flink.util.{ Collector, Preconditions }
+import org.apache.flink.api.common.state.ValueState
+import org.apache.flink.api.common.state.ValueStateDescriptor
+import scala.util.control.Breaks._
+import org.apache.flink.api.java.tuple.{ Tuple2 => JTuple2 }
+import org.apache.flink.api.common.state.MapState
+import org.apache.flink.api.common.state.MapStateDescriptor
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.ListTypeInfo
+import java.util.Comparator
+import java.util.ArrayList
+import java.util.Collections
+import org.apache.flink.api.common.typeutils.TypeComparator
+import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
+
+/**
+ * Process Function used for the sort based solely on proctime with 
offset/fetch
+ * [[org.apache.flink.streaming.api.datastream.DataStream]]
+ *
+ * @param offset Is used to indicate the number of elements to be skipped 
in the current context
+ * (0 offset allows to execute only fetch)
+ * @param fetch Is used to indicate the number of elements to be outputted 
in the current context
+ * @param inputType It is used to mark the type of the incoming data
+ */
+class ProcTimeIdentitySortProcessFunctionOffsetFetch(
+  private val offset: Int,
+  private val fetch: Int,
+  private val inputRowType: CRowTypeInfo)
+extends ProcessFunction[CRow, CRow] {
+
+  private var stateEventsBuffer: ListState[Row] = _
+  
+  private var outputC: CRow = _
+  private val adjustedFetchLimit = offset + fetch
+  
+  override def open(config: Configuration) {
+val sortDescriptor = new ListStateDescriptor[Row]("sortState",
+inputRowType.asInstanceOf[CRowTypeInfo].rowType)
+stateEventsBuffer = getRuntimeContext.getListState(sortDescriptor)
+
+val arity:Integer = inputRowType.getArity
+if (outputC == null) {
+  outputC = new CRow(Row.of(arity), true)
+}
+
+  }
+
+  override def processElement(
+inputC: CRow,
+ctx: ProcessFunction[CRow, CRow]#Context,
+out: Collector[CRow]): Unit = {
+
+val input = inputC.row
+
+val currentTime = ctx.timerService.currentProcessingTime
+//buffer the event incoming event
+stateEventsBuffer.add(input)
+
+//deduplication of multiple registered timers is done automatically
+ctx.timerService.registerProcessingTimeTimer(currentTime + 1)  
+
+  }
+  
+  override def onTimer(
--- End diff --

we don't need timers for this operator


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4380: Time sort with offset/fetch without retraction

2017-07-27 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4380#discussion_r129832039
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeIdentitySortProcessFunctionOffsetFetch.scala
 ---
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.aggregate
+
+import org.apache.flink.api.common.state.{ ListState, ListStateDescriptor }
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.runtime.state.{ FunctionInitializationContext, 
FunctionSnapshotContext }
+import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.flink.types.Row
+import org.apache.flink.util.{ Collector, Preconditions }
+import org.apache.flink.api.common.state.ValueState
+import org.apache.flink.api.common.state.ValueStateDescriptor
+import scala.util.control.Breaks._
+import org.apache.flink.api.java.tuple.{ Tuple2 => JTuple2 }
+import org.apache.flink.api.common.state.MapState
+import org.apache.flink.api.common.state.MapStateDescriptor
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.ListTypeInfo
+import java.util.Comparator
+import java.util.ArrayList
+import java.util.Collections
+import org.apache.flink.api.common.typeutils.TypeComparator
+import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
+
+/**
+ * Process Function used for the sort based solely on proctime with 
offset/fetch
+ * [[org.apache.flink.streaming.api.datastream.DataStream]]
+ *
+ * @param offset Is used to indicate the number of elements to be skipped 
in the current context
+ * (0 offset allows to execute only fetch)
+ * @param fetch Is used to indicate the number of elements to be outputted 
in the current context
+ * @param inputType It is used to mark the type of the incoming data
+ */
+class ProcTimeIdentitySortProcessFunctionOffsetFetch(
+  private val offset: Int,
+  private val fetch: Int,
+  private val inputRowType: CRowTypeInfo)
+extends ProcessFunction[CRow, CRow] {
+
+  private var stateEventsBuffer: ListState[Row] = _
+  
+  private var outputC: CRow = _
+  private val adjustedFetchLimit = offset + fetch
+  
+  override def open(config: Configuration) {
+val sortDescriptor = new ListStateDescriptor[Row]("sortState",
+inputRowType.asInstanceOf[CRowTypeInfo].rowType)
+stateEventsBuffer = getRuntimeContext.getListState(sortDescriptor)
+
+val arity:Integer = inputRowType.getArity
--- End diff --

+space `arity: Integer`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4380: Time sort with offset/fetch without retraction

2017-07-27 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4380#discussion_r129835046
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeIdentitySortProcessFunctionOffsetFetch.scala
 ---
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.aggregate
+
+import org.apache.flink.api.common.state.{ ListState, ListStateDescriptor }
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.runtime.state.{ FunctionInitializationContext, 
FunctionSnapshotContext }
+import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.flink.types.Row
+import org.apache.flink.util.{ Collector, Preconditions }
+import org.apache.flink.api.common.state.ValueState
+import org.apache.flink.api.common.state.ValueStateDescriptor
+import scala.util.control.Breaks._
+import org.apache.flink.api.java.tuple.{ Tuple2 => JTuple2 }
+import org.apache.flink.api.common.state.MapState
+import org.apache.flink.api.common.state.MapStateDescriptor
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.ListTypeInfo
+import java.util.Comparator
+import java.util.ArrayList
+import java.util.Collections
+import org.apache.flink.api.common.typeutils.TypeComparator
+import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
+
+/**
+ * Process Function used for the sort based solely on proctime with 
offset/fetch
+ * [[org.apache.flink.streaming.api.datastream.DataStream]]
+ *
+ * @param offset Is used to indicate the number of elements to be skipped 
in the current context
+ * (0 offset allows to execute only fetch)
+ * @param fetch Is used to indicate the number of elements to be outputted 
in the current context
+ * @param inputType It is used to mark the type of the incoming data
+ */
+class ProcTimeIdentitySortProcessFunctionOffsetFetch(
+  private val offset: Int,
+  private val fetch: Int,
+  private val inputRowType: CRowTypeInfo)
+extends ProcessFunction[CRow, CRow] {
+
+  private var stateEventsBuffer: ListState[Row] = _
+  
+  private var outputC: CRow = _
--- End diff --

We don't need `outputC`. We can simply forward the input `CRow`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4380: Time sort with offset/fetch without retraction

2017-07-27 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4380#discussion_r129827640
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/SortUtil.scala
 ---
@@ -75,12 +77,105 @@ object SortUtil {
 val inputCRowType = CRowTypeInfo(inputTypeInfo)
  
 new RowTimeSortProcessFunction(
+  0,
+  -1,
   inputCRowType,
   collectionRowComparator)
 
   }
   
   /**
+   * Function creates 
[org.apache.flink.streaming.api.functions.ProcessFunction] for sorting   
+   * with offset elements based on rowtime and potentially other fields 
with
+   * @param collationSort The Sort collation list
+   * @param sortOffset The offset indicator
+   * @param inputType input row type
+   * @param inputTypeInfo input type information
+   * @param execCfg table environment execution configuration
+   * @return org.apache.flink.streaming.api.functions.ProcessFunction
+   */
+  private[flink] def createRowTimeSortFunctionOffset(
--- End diff --

I think we can consolidate all sort-related methods in `SortUtil` into 
three methods:

* `createProcTimeNoSortFunction(..., sortOffset: Option[RexNode], 
sortFetch: Offset[RexNode])`
* `createProcTimeSortFunction(..., sortOffset: Option[RexNode], sortFetch: 
Offset[RexNode])`
* `createRowTimeSortFunction(..., sortOffset: Option[RexNode], sortFetch: 
Offset[RexNode])`

Each method handles all combinations of `offset` and `fetch` with two 
simple conditions to set the parameter to `-1`, `0`, or the actual value.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4380: Time sort with offset/fetch without retraction

2017-07-27 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4380#discussion_r129838980
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamSort.scala
 ---
@@ -108,28 +108,25 @@ class DataStreamSort(
 case _ if FlinkTypeFactory.isProctimeIndicatorType(timeType)  =>
--- End diff --

There is a lot code duplication in this class. All `createSort` methods 
look basically the same and mostly differ in the `SortUtil` methods they call. 
I think we don't need these methods and can do everything with a few if 
conditions directly in the `translateToPlan()` method.

Basically:
```
val returnTypeInfo = CRowTypeInfo(schema.physicalTypeInfo)

val pFunc = if (FlinkTypeFactory.isProctimeIndicatorType(timeType) && 
sortCollation.getFieldCollations.size() == 1) {
  SortUtil.createProcTimeNoSortFunction(..., sortOffset, sortFetch)
} else if (FlinkTypeFactory.isProctimeIndicatorType(timeType)) {
  SortUtil.createProcTimeSortFunction(..., sortOffset, sortFetch)
} else if (FlinkTypeFactory.isRowtimeIndicatorType(timeType)) {
  SortUtil.createRowTimeSortFunction(..., sortOffset, sortFetch)
} else {
  // error
}

inputDS.keyBy(new NullByteKeySelector[CRow])
  .process(processFunction).setParallelism(1).setMaxParallelism(1)
  .returns(returnTypeInfo)
  .asInstanceOf[DataStream[CRow]]
```

We would have to change the `IdentityCRowMap` to a ProcessFunction but 
that's fine. `ORDER BY proctime` is a corner case that does not add 
functionality and is only supported for syntactical completeness. IMO it is not 
worth added code complexity.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4380: Time sort with offset/fetch without retraction

2017-07-20 Thread rtudoran
GitHub user rtudoran opened a pull request:

https://github.com/apache/flink/pull/4380

Time sort with offset/fetch without retraction

Thanks for contributing to Apache Flink. Before you open your pull request, 
please take the following check list into consideration.
If your changes take all of the items into account, feel free to open your 
pull request. For more information and/or questions please refer to the [How To 
Contribute guide](http://flink.apache.org/how-to-contribute.html).
In addition to going through the list, please provide a meaningful 
description of your changes.

- [x ] General
  - The pull request references the related JIRA issue ("[FLINK-XXX] Jira 
title text")
  - The pull request addresses only one issue
  - Each commit in the PR has a meaningful commit message (including the 
JIRA id)

- [x ] Documentation
  - Documentation has been added for new functionality
  - Old documentation affected by the pull request has been updated
  - JavaDoc for public methods has been added

- [x ] Tests & Build
  - Functionality added by the pull request is covered by tests
  - `mvn clean verify` has been executed successfully locally or a Travis 
build has passed


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/huawei-flink/flink FLINK-6075-OF-NoRetraction

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4380.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4380


commit c95bf6b55cdf3e1b48d74de584937ec6b2c36bbe
Author: rtudoran 
Date:   2017-07-20T15:36:45Z

Time sort with offset/fetch without retraction




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---