[GitHub] flink pull request #4380: Time sort with offset/fetch without retraction
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4380#discussion_r129837121 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeIdentitySortProcessFunctionOffsetFetch.scala --- @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.aggregate + +import org.apache.flink.api.common.state.{ ListState, ListStateDescriptor } +import org.apache.flink.api.java.typeutils.RowTypeInfo +import org.apache.flink.configuration.Configuration +import org.apache.flink.runtime.state.{ FunctionInitializationContext, FunctionSnapshotContext } +import org.apache.flink.streaming.api.functions.ProcessFunction +import org.apache.flink.types.Row +import org.apache.flink.util.{ Collector, Preconditions } +import org.apache.flink.api.common.state.ValueState +import org.apache.flink.api.common.state.ValueStateDescriptor +import scala.util.control.Breaks._ +import org.apache.flink.api.java.tuple.{ Tuple2 => JTuple2 } +import org.apache.flink.api.common.state.MapState +import org.apache.flink.api.common.state.MapStateDescriptor +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.typeutils.ListTypeInfo +import java.util.Comparator +import java.util.ArrayList +import java.util.Collections +import org.apache.flink.api.common.typeutils.TypeComparator +import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo} + +/** + * Process Function used for the sort based solely on proctime with offset/fetch + * [[org.apache.flink.streaming.api.datastream.DataStream]] + * + * @param offset Is used to indicate the number of elements to be skipped in the current context + * (0 offset allows to execute only fetch) + * @param fetch Is used to indicate the number of elements to be outputted in the current context + * @param inputType It is used to mark the type of the incoming data + */ +class ProcTimeIdentitySortProcessFunctionOffsetFetch( + private val offset: Int, + private val fetch: Int, + private val inputRowType: CRowTypeInfo) +extends ProcessFunction[CRow, CRow] { + + private var stateEventsBuffer: ListState[Row] = _ + + private var outputC: CRow = _ + private val adjustedFetchLimit = offset + fetch + + override def open(config: Configuration) { +val sortDescriptor = new ListStateDescriptor[Row]("sortState", +inputRowType.asInstanceOf[CRowTypeInfo].rowType) +stateEventsBuffer = getRuntimeContext.getListState(sortDescriptor) + +val arity:Integer = inputRowType.getArity +if (outputC == null) { + outputC = new CRow(Row.of(arity), true) +} + + } + + override def processElement( --- End diff -- This operator should be implemented as follows: ``` val fetchCnt = fetchCount.value if (fetch == -1 || fetchCnt < fetch) { // we haven't fetched enough rows val offsetCnt = offsetCount.value if (offsetCnt < offset) { // we haven't skipped enough rows // increment counter and skip row offsetCount.update(offsetCnt + 1) } else { // forward row out.collect(inputC) if (fetchCnt != -1) { fetchCount.update(fetchCnt + 1) } } } else { // we fetch enough rows. drop Row and return } ``` As you'll notice `ORDER BY proctime ASC FETCH x ROWS FIRST` is quite pointless because it will only emit x rows than nothing more. However, that's the correct semantics here. `OFFSET` is similar because it won't emit the first x rows which is not really meaningful either in a streaming context. The other combinations are basically the same. The only difference is that they do a bit more sorting to identify the rows that have to be dropped. The sorting operators have to do the sorting as before in `onTimer()` but each
[GitHub] flink pull request #4380: Time sort with offset/fetch without retraction
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4380#discussion_r129833188 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeIdentitySortProcessFunctionOffsetFetch.scala --- @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.aggregate + +import org.apache.flink.api.common.state.{ ListState, ListStateDescriptor } +import org.apache.flink.api.java.typeutils.RowTypeInfo +import org.apache.flink.configuration.Configuration +import org.apache.flink.runtime.state.{ FunctionInitializationContext, FunctionSnapshotContext } +import org.apache.flink.streaming.api.functions.ProcessFunction +import org.apache.flink.types.Row +import org.apache.flink.util.{ Collector, Preconditions } +import org.apache.flink.api.common.state.ValueState +import org.apache.flink.api.common.state.ValueStateDescriptor +import scala.util.control.Breaks._ +import org.apache.flink.api.java.tuple.{ Tuple2 => JTuple2 } +import org.apache.flink.api.common.state.MapState +import org.apache.flink.api.common.state.MapStateDescriptor +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.typeutils.ListTypeInfo +import java.util.Comparator +import java.util.ArrayList +import java.util.Collections +import org.apache.flink.api.common.typeutils.TypeComparator +import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo} + +/** + * Process Function used for the sort based solely on proctime with offset/fetch + * [[org.apache.flink.streaming.api.datastream.DataStream]] + * + * @param offset Is used to indicate the number of elements to be skipped in the current context + * (0 offset allows to execute only fetch) + * @param fetch Is used to indicate the number of elements to be outputted in the current context + * @param inputType It is used to mark the type of the incoming data + */ +class ProcTimeIdentitySortProcessFunctionOffsetFetch( + private val offset: Int, + private val fetch: Int, + private val inputRowType: CRowTypeInfo) +extends ProcessFunction[CRow, CRow] { + + private var stateEventsBuffer: ListState[Row] = _ + + private var outputC: CRow = _ + private val adjustedFetchLimit = offset + fetch + + override def open(config: Configuration) { +val sortDescriptor = new ListStateDescriptor[Row]("sortState", --- End diff -- We don't need state to collect records for this operator. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4380: Time sort with offset/fetch without retraction
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4380#discussion_r129821546 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamSort.scala --- @@ -108,28 +108,25 @@ class DataStreamSort( case _ if FlinkTypeFactory.isProctimeIndicatorType(timeType) => (sortOffset, sortFetch) match { --- End diff -- change `sortOffset` and `sortFetch` member fields to `Option[RexNode]` to avoid `null`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4380: Time sort with offset/fetch without retraction
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4380#discussion_r129833609 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeIdentitySortProcessFunctionOffsetFetch.scala --- @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.aggregate + +import org.apache.flink.api.common.state.{ ListState, ListStateDescriptor } +import org.apache.flink.api.java.typeutils.RowTypeInfo +import org.apache.flink.configuration.Configuration +import org.apache.flink.runtime.state.{ FunctionInitializationContext, FunctionSnapshotContext } +import org.apache.flink.streaming.api.functions.ProcessFunction +import org.apache.flink.types.Row +import org.apache.flink.util.{ Collector, Preconditions } +import org.apache.flink.api.common.state.ValueState +import org.apache.flink.api.common.state.ValueStateDescriptor +import scala.util.control.Breaks._ +import org.apache.flink.api.java.tuple.{ Tuple2 => JTuple2 } +import org.apache.flink.api.common.state.MapState +import org.apache.flink.api.common.state.MapStateDescriptor +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.typeutils.ListTypeInfo +import java.util.Comparator +import java.util.ArrayList +import java.util.Collections +import org.apache.flink.api.common.typeutils.TypeComparator +import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo} + +/** + * Process Function used for the sort based solely on proctime with offset/fetch + * [[org.apache.flink.streaming.api.datastream.DataStream]] + * + * @param offset Is used to indicate the number of elements to be skipped in the current context + * (0 offset allows to execute only fetch) + * @param fetch Is used to indicate the number of elements to be outputted in the current context + * @param inputType It is used to mark the type of the incoming data + */ +class ProcTimeIdentitySortProcessFunctionOffsetFetch( + private val offset: Int, + private val fetch: Int, + private val inputRowType: CRowTypeInfo) +extends ProcessFunction[CRow, CRow] { + + private var stateEventsBuffer: ListState[Row] = _ + + private var outputC: CRow = _ + private val adjustedFetchLimit = offset + fetch + + override def open(config: Configuration) { +val sortDescriptor = new ListStateDescriptor[Row]("sortState", --- End diff -- Instead we need two `ValueState[Long]` for `offsetCount` and `fetchCount`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4380: Time sort with offset/fetch without retraction
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4380#discussion_r129833309 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeIdentitySortProcessFunctionOffsetFetch.scala --- @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.aggregate + +import org.apache.flink.api.common.state.{ ListState, ListStateDescriptor } +import org.apache.flink.api.java.typeutils.RowTypeInfo +import org.apache.flink.configuration.Configuration +import org.apache.flink.runtime.state.{ FunctionInitializationContext, FunctionSnapshotContext } +import org.apache.flink.streaming.api.functions.ProcessFunction +import org.apache.flink.types.Row +import org.apache.flink.util.{ Collector, Preconditions } +import org.apache.flink.api.common.state.ValueState +import org.apache.flink.api.common.state.ValueStateDescriptor +import scala.util.control.Breaks._ +import org.apache.flink.api.java.tuple.{ Tuple2 => JTuple2 } +import org.apache.flink.api.common.state.MapState +import org.apache.flink.api.common.state.MapStateDescriptor +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.typeutils.ListTypeInfo +import java.util.Comparator +import java.util.ArrayList +import java.util.Collections +import org.apache.flink.api.common.typeutils.TypeComparator +import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo} + +/** + * Process Function used for the sort based solely on proctime with offset/fetch + * [[org.apache.flink.streaming.api.datastream.DataStream]] + * + * @param offset Is used to indicate the number of elements to be skipped in the current context + * (0 offset allows to execute only fetch) + * @param fetch Is used to indicate the number of elements to be outputted in the current context + * @param inputType It is used to mark the type of the incoming data + */ +class ProcTimeIdentitySortProcessFunctionOffsetFetch( + private val offset: Int, + private val fetch: Int, + private val inputRowType: CRowTypeInfo) +extends ProcessFunction[CRow, CRow] { + + private var stateEventsBuffer: ListState[Row] = _ + + private var outputC: CRow = _ + private val adjustedFetchLimit = offset + fetch + + override def open(config: Configuration) { +val sortDescriptor = new ListStateDescriptor[Row]("sortState", +inputRowType.asInstanceOf[CRowTypeInfo].rowType) +stateEventsBuffer = getRuntimeContext.getListState(sortDescriptor) + +val arity:Integer = inputRowType.getArity +if (outputC == null) { + outputC = new CRow(Row.of(arity), true) +} + + } + + override def processElement( +inputC: CRow, +ctx: ProcessFunction[CRow, CRow]#Context, +out: Collector[CRow]): Unit = { + +val input = inputC.row + +val currentTime = ctx.timerService.currentProcessingTime +//buffer the event incoming event +stateEventsBuffer.add(input) + +//deduplication of multiple registered timers is done automatically +ctx.timerService.registerProcessingTimeTimer(currentTime + 1) + + } + + override def onTimer( --- End diff -- we don't need timers for this operator --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4380: Time sort with offset/fetch without retraction
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4380#discussion_r129832039 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeIdentitySortProcessFunctionOffsetFetch.scala --- @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.aggregate + +import org.apache.flink.api.common.state.{ ListState, ListStateDescriptor } +import org.apache.flink.api.java.typeutils.RowTypeInfo +import org.apache.flink.configuration.Configuration +import org.apache.flink.runtime.state.{ FunctionInitializationContext, FunctionSnapshotContext } +import org.apache.flink.streaming.api.functions.ProcessFunction +import org.apache.flink.types.Row +import org.apache.flink.util.{ Collector, Preconditions } +import org.apache.flink.api.common.state.ValueState +import org.apache.flink.api.common.state.ValueStateDescriptor +import scala.util.control.Breaks._ +import org.apache.flink.api.java.tuple.{ Tuple2 => JTuple2 } +import org.apache.flink.api.common.state.MapState +import org.apache.flink.api.common.state.MapStateDescriptor +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.typeutils.ListTypeInfo +import java.util.Comparator +import java.util.ArrayList +import java.util.Collections +import org.apache.flink.api.common.typeutils.TypeComparator +import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo} + +/** + * Process Function used for the sort based solely on proctime with offset/fetch + * [[org.apache.flink.streaming.api.datastream.DataStream]] + * + * @param offset Is used to indicate the number of elements to be skipped in the current context + * (0 offset allows to execute only fetch) + * @param fetch Is used to indicate the number of elements to be outputted in the current context + * @param inputType It is used to mark the type of the incoming data + */ +class ProcTimeIdentitySortProcessFunctionOffsetFetch( + private val offset: Int, + private val fetch: Int, + private val inputRowType: CRowTypeInfo) +extends ProcessFunction[CRow, CRow] { + + private var stateEventsBuffer: ListState[Row] = _ + + private var outputC: CRow = _ + private val adjustedFetchLimit = offset + fetch + + override def open(config: Configuration) { +val sortDescriptor = new ListStateDescriptor[Row]("sortState", +inputRowType.asInstanceOf[CRowTypeInfo].rowType) +stateEventsBuffer = getRuntimeContext.getListState(sortDescriptor) + +val arity:Integer = inputRowType.getArity --- End diff -- +space `arity: Integer` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4380: Time sort with offset/fetch without retraction
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4380#discussion_r129835046 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeIdentitySortProcessFunctionOffsetFetch.scala --- @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.aggregate + +import org.apache.flink.api.common.state.{ ListState, ListStateDescriptor } +import org.apache.flink.api.java.typeutils.RowTypeInfo +import org.apache.flink.configuration.Configuration +import org.apache.flink.runtime.state.{ FunctionInitializationContext, FunctionSnapshotContext } +import org.apache.flink.streaming.api.functions.ProcessFunction +import org.apache.flink.types.Row +import org.apache.flink.util.{ Collector, Preconditions } +import org.apache.flink.api.common.state.ValueState +import org.apache.flink.api.common.state.ValueStateDescriptor +import scala.util.control.Breaks._ +import org.apache.flink.api.java.tuple.{ Tuple2 => JTuple2 } +import org.apache.flink.api.common.state.MapState +import org.apache.flink.api.common.state.MapStateDescriptor +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.typeutils.ListTypeInfo +import java.util.Comparator +import java.util.ArrayList +import java.util.Collections +import org.apache.flink.api.common.typeutils.TypeComparator +import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo} + +/** + * Process Function used for the sort based solely on proctime with offset/fetch + * [[org.apache.flink.streaming.api.datastream.DataStream]] + * + * @param offset Is used to indicate the number of elements to be skipped in the current context + * (0 offset allows to execute only fetch) + * @param fetch Is used to indicate the number of elements to be outputted in the current context + * @param inputType It is used to mark the type of the incoming data + */ +class ProcTimeIdentitySortProcessFunctionOffsetFetch( + private val offset: Int, + private val fetch: Int, + private val inputRowType: CRowTypeInfo) +extends ProcessFunction[CRow, CRow] { + + private var stateEventsBuffer: ListState[Row] = _ + + private var outputC: CRow = _ --- End diff -- We don't need `outputC`. We can simply forward the input `CRow`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4380: Time sort with offset/fetch without retraction
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4380#discussion_r129827640 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/SortUtil.scala --- @@ -75,12 +77,105 @@ object SortUtil { val inputCRowType = CRowTypeInfo(inputTypeInfo) new RowTimeSortProcessFunction( + 0, + -1, inputCRowType, collectionRowComparator) } /** + * Function creates [org.apache.flink.streaming.api.functions.ProcessFunction] for sorting + * with offset elements based on rowtime and potentially other fields with + * @param collationSort The Sort collation list + * @param sortOffset The offset indicator + * @param inputType input row type + * @param inputTypeInfo input type information + * @param execCfg table environment execution configuration + * @return org.apache.flink.streaming.api.functions.ProcessFunction + */ + private[flink] def createRowTimeSortFunctionOffset( --- End diff -- I think we can consolidate all sort-related methods in `SortUtil` into three methods: * `createProcTimeNoSortFunction(..., sortOffset: Option[RexNode], sortFetch: Offset[RexNode])` * `createProcTimeSortFunction(..., sortOffset: Option[RexNode], sortFetch: Offset[RexNode])` * `createRowTimeSortFunction(..., sortOffset: Option[RexNode], sortFetch: Offset[RexNode])` Each method handles all combinations of `offset` and `fetch` with two simple conditions to set the parameter to `-1`, `0`, or the actual value. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4380: Time sort with offset/fetch without retraction
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4380#discussion_r129838980 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamSort.scala --- @@ -108,28 +108,25 @@ class DataStreamSort( case _ if FlinkTypeFactory.isProctimeIndicatorType(timeType) => --- End diff -- There is a lot code duplication in this class. All `createSort` methods look basically the same and mostly differ in the `SortUtil` methods they call. I think we don't need these methods and can do everything with a few if conditions directly in the `translateToPlan()` method. Basically: ``` val returnTypeInfo = CRowTypeInfo(schema.physicalTypeInfo) val pFunc = if (FlinkTypeFactory.isProctimeIndicatorType(timeType) && sortCollation.getFieldCollations.size() == 1) { SortUtil.createProcTimeNoSortFunction(..., sortOffset, sortFetch) } else if (FlinkTypeFactory.isProctimeIndicatorType(timeType)) { SortUtil.createProcTimeSortFunction(..., sortOffset, sortFetch) } else if (FlinkTypeFactory.isRowtimeIndicatorType(timeType)) { SortUtil.createRowTimeSortFunction(..., sortOffset, sortFetch) } else { // error } inputDS.keyBy(new NullByteKeySelector[CRow]) .process(processFunction).setParallelism(1).setMaxParallelism(1) .returns(returnTypeInfo) .asInstanceOf[DataStream[CRow]] ``` We would have to change the `IdentityCRowMap` to a ProcessFunction but that's fine. `ORDER BY proctime` is a corner case that does not add functionality and is only supported for syntactical completeness. IMO it is not worth added code complexity. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4380: Time sort with offset/fetch without retraction
GitHub user rtudoran opened a pull request: https://github.com/apache/flink/pull/4380 Time sort with offset/fetch without retraction Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration. If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide](http://flink.apache.org/how-to-contribute.html). In addition to going through the list, please provide a meaningful description of your changes. - [x ] General - The pull request references the related JIRA issue ("[FLINK-XXX] Jira title text") - The pull request addresses only one issue - Each commit in the PR has a meaningful commit message (including the JIRA id) - [x ] Documentation - Documentation has been added for new functionality - Old documentation affected by the pull request has been updated - JavaDoc for public methods has been added - [x ] Tests & Build - Functionality added by the pull request is covered by tests - `mvn clean verify` has been executed successfully locally or a Travis build has passed You can merge this pull request into a Git repository by running: $ git pull https://github.com/huawei-flink/flink FLINK-6075-OF-NoRetraction Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4380.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4380 commit c95bf6b55cdf3e1b48d74de584937ec6b2c36bbe Author: rtudoranDate: 2017-07-20T15:36:45Z Time sort with offset/fetch without retraction --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---