[GitHub] flink issue #6357: [FLINK-9720] [Scheduler] Add ResourceAttribute and MesosR...
Github user liurenjie1024 commented on the issue: https://github.com/apache/flink/pull/6357 @tillrohrmann Can you help to review this? ---
[GitHub] flink pull request #6357: [FLINK-9720] [Scheduler] Add ResourceAttribute and...
GitHub user liurenjie1024 opened a pull request: https://github.com/apache/flink/pull/6357 [FLINK-9720] [Scheduler] Add ResourceAttribute and MesosResourceAttribute. ## What is the purpose of the change *This pull request is one of PRs for adding tag support to flink scheduler. * ## Brief change log - *Add ResourceAttribute interface.* - *Add MesosResourceAttribute class.* ## Verifying this change This change is a trivial rework / code cleanup without any test coverage. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? no You can merge this pull request into a Git repository by running: $ git pull https://github.com/liurenjie1024/flink FLINK-9720 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/6357.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #6357 commit 80d80a719b5825d59bfcd07a2333e78e33204923 Author: liurenjie1024 Date: 2018-07-16T15:09:32Z Add ResourceAttribute and MesosResourceAttribute. ---
[GitHub] flink issue #6214: [FLINK-9669] Add assignment store interface.
Github user liurenjie1024 commented on the issue: https://github.com/apache/flink/pull/6214 @tillrohrmann This is from my initial design, and since the design has changed, we can close this now. ---
[GitHub] flink pull request #6214: [FLINK-9669] Add assignment store interface.
Github user liurenjie1024 closed the pull request at: https://github.com/apache/flink/pull/6214 ---
[GitHub] flink pull request #6214: [FLINK-9669] Add assignment store interface.
GitHub user liurenjie1024 opened a pull request: https://github.com/apache/flink/pull/6214 [FLINK-9669] Add assignment store interface. ## What is the purpose of the change This pull requests is part of process to enable task manager isolation in flink 1.5 session mode. ## Brief change log - Add task manager assignment class. - Add task manager assignment store interface. ## Verifying this change This change is a trivial rework / code cleanup without any test coverage. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`:no - The serializers: no - The runtime per-record code paths (performance sensitive): no - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: yes - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? no - If yes, how is the feature documented? JavaDocs You can merge this pull request into a Git repository by running: $ git pull https://github.com/liurenjie1024/flink FLINK-9669 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/6214.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #6214 commit 2b6156459e7ad45cfd33147cfa27a6c7386dd4c4 Author: liurenjie1024 Date: 2018-06-27T09:08:11Z Add assignment store interface. ---
[GitHub] flink issue #5688: [FLINK-6968][Table API & SQL] Add Queryable table sink.
Github user liurenjie1024 commented on the issue: https://github.com/apache/flink/pull/5688 @twalthr I store each column separately to avoid encoding and schema management problems, also to reduce unnecessary data transfer when querying data. Issue multiple requests for multiple fields is one of the drawbacks. ---
[GitHub] flink pull request #5688: [FLINK-6968][Table API & SQL] Add Queryable table ...
Github user liurenjie1024 commented on a diff in the pull request: https://github.com/apache/flink/pull/5688#discussion_r190859490 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/KeyedProcessFunctionWithCleanupState.scala --- @@ -44,8 +45,20 @@ abstract class KeyedProcessFunctionWithCleanupState[K, I, O](queryConfig: Stream protected def registerProcessingCleanupTimer( ctx: KeyedProcessFunction[K, I, O]#Context, currentTime: Long): Unit = { -if (stateCleaningEnabled) { +registerCleanupTimer(ctx, currentTime, TimeDomain.PROCESSING_TIME) + } + protected def registerEventCleanupTimer( --- End diff -- The reason why I put it in the same PR is that I don't want it to block this PR, but I also agree that we should move it to a separate one. ---
[GitHub] flink issue #5688: [FLINK-6968][Table API & SQL] Add Queryable table sink.
Github user liurenjie1024 commented on the issue: https://github.com/apache/flink/pull/5688 @suez1224 Conflict resolved. ---
[GitHub] flink issue #5704: [FLINK-8852] [sql-client] Add FLIP-6 support to SQL Clien...
Github user liurenjie1024 commented on the issue: https://github.com/apache/flink/pull/5704 LGTM ---
[GitHub] flink pull request #5688: [FLINK-6968][Table API & SQL] Add Queryable table ...
Github user liurenjie1024 commented on a diff in the pull request: https://github.com/apache/flink/pull/5688#discussion_r175680499 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/QueryableTableSink.scala --- @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.sinks + +import java.lang.{Boolean => JBool} + +import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor} +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.functions.KeySelector +import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2} +import org.apache.flink.api.java.typeutils.{ResultTypeQueryable, RowTypeInfo} +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.datastream.DataStream +import org.apache.flink.streaming.api.functions.ProcessFunction +import org.apache.flink.table.api.StreamQueryConfig +import org.apache.flink.table.runtime.aggregate.ProcessFunctionWithCleanupState +import org.apache.flink.types.Row +import org.apache.flink.util.Collector +import org.slf4j.LoggerFactory + +class QueryableTableSink( +private val namePrefix: String, +private val queryConfig: StreamQueryConfig) + extends UpsertStreamTableSink[Row] +with TableSinkBase[JTuple2[JBool, Row]] { + private var keys: Array[String] = _ + + override def setKeyFields(keys: Array[String]): Unit = { +if (keys == null) { + throw new IllegalArgumentException("keys can't be null!") +} +this.keys = keys + } + + override def setIsAppendOnly(isAppendOnly: JBool): Unit = { +if (isAppendOnly) { + throw new IllegalArgumentException("A QueryableTableSink can not be used with append-only " + +"tables as the table would grow infinitely") +} + } + + override def getRecordType: TypeInformation[Row] = new RowTypeInfo(getFieldTypes, getFieldNames) + + override def emitDataStream(dataStream: DataStream[JTuple2[JBool, Row]]): Unit = { +val keyIndices = keys.map(getFieldNames.indexOf(_)) +val keyTypes = keyIndices.map(getFieldTypes(_)) + +val keySelectorType = new RowTypeInfo(keyTypes, keys) + +val processFunction = new QueryableStateProcessFunction( + namePrefix, + queryConfig, + keys, + getFieldNames, + getFieldTypes) + +dataStream.keyBy(new RowKeySelector(keyIndices, keySelectorType)) + .process(processFunction) + } + + override protected def copy: TableSinkBase[JTuple2[JBool, Row]] = { +new QueryableTableSink(this.namePrefix, this.queryConfig) + } +} + +class RowKeySelector( + private val keyIndices: Array[Int], + @transient private val returnType: TypeInformation[Row]) + extends KeySelector[JTuple2[JBool, Row], Row] +with ResultTypeQueryable[Row] { + + override def getKey(value: JTuple2[JBool, Row]): Row = { +val keys = keyIndices + +val srcRow = value.f1 + +val destRow = new Row(keys.length) +var i = 0 +while (i < keys.length) { + destRow.setField(i, srcRow.getField(keys(i))) + i += 1 +} + +destRow + } + + override def getProducedType: TypeInformation[Row] = returnType +} + +class QueryableStateProcessFunction( + private val namePrefix: String, + private val queryConfig: StreamQueryConfig, + private val keyNames: Array[String], + private val fieldNames: Array[String], + private val fieldTypes: Array[TypeInformation[_]]) + extends ProcessFunctionWithCleanupState[JTuple2[JBool, Row], Void](queryConfig) { + + @transient private var states = Array[ValueState[AnyRef]]() + @transient private var nonKeyIndices = Array[Int]() + + override d
[GitHub] flink pull request #5688: [FLINK-6968][Table API & SQL] Add Queryable table ...
Github user liurenjie1024 commented on a diff in the pull request: https://github.com/apache/flink/pull/5688#discussion_r175680339 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/QueryableTableSink.scala --- @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.sinks + +import java.lang.{Boolean => JBool} + +import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor} +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.functions.KeySelector +import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2} +import org.apache.flink.api.java.typeutils.{ResultTypeQueryable, RowTypeInfo} +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.datastream.DataStream +import org.apache.flink.streaming.api.functions.ProcessFunction +import org.apache.flink.table.api.StreamQueryConfig +import org.apache.flink.table.runtime.aggregate.ProcessFunctionWithCleanupState +import org.apache.flink.types.Row +import org.apache.flink.util.Collector +import org.slf4j.LoggerFactory + +class QueryableTableSink( --- End diff -- Java doc added. ---
[GitHub] flink issue #5680: [FLINK-8919] Add KeyedProcessFunctionWithCleanupState.
Github user liurenjie1024 commented on the issue: https://github.com/apache/flink/pull/5680 Can anyone help to merge this? ---
[GitHub] flink pull request #5688: [FLINK-6968][Table API & SQL] Add Queryable table ...
Github user liurenjie1024 commented on a diff in the pull request: https://github.com/apache/flink/pull/5688#discussion_r174990348 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/QueryableTableSink.scala --- @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.sinks + +import java.lang.{Boolean => JBool} + +import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor} +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.functions.KeySelector +import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2} +import org.apache.flink.api.java.typeutils.{ResultTypeQueryable, RowTypeInfo} +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.datastream.DataStream +import org.apache.flink.streaming.api.functions.ProcessFunction +import org.apache.flink.table.api.StreamQueryConfig +import org.apache.flink.table.runtime.aggregate.ProcessFunctionWithCleanupState +import org.apache.flink.types.Row +import org.apache.flink.util.Collector +import org.slf4j.LoggerFactory + +class QueryableTableSink( + private val namePrefix: String, --- End diff -- Fixed. ---
[GitHub] flink issue #5688: [FLINK-6968][Table API & SQL] Add Queryable table sink.
Github user liurenjie1024 commented on the issue: https://github.com/apache/flink/pull/5688 @xccui Thanks for you suggestions. 1. Add an it test for this is difficult. Think about it, I need to ensure that elements to be processed while keeping the job running, and it's difficult to achieve this. But I will do some manual test for that. 2. I'll add doc for that. 3. I'll squash the commits when review is done. ---
[GitHub] flink pull request #5688: [FLINK-6968][Table API & SQL] Add Queryable table ...
Github user liurenjie1024 commented on a diff in the pull request: https://github.com/apache/flink/pull/5688#discussion_r174366980 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/QueryableTableSinkTest.scala --- @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.stream.table + +import java.lang.{Boolean => JBool} + +import org.apache.flink.api.common.state.ValueStateDescriptor +import org.apache.flink.api.common.time.Time +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2} +import org.apache.flink.api.java.typeutils.RowTypeInfo +import org.apache.flink.streaming.api.operators.LegacyKeyedProcessOperator +import org.apache.flink.table.api.StreamQueryConfig +import org.apache.flink.table.runtime.harness.HarnessTestBase +import org.apache.flink.table.sinks.{QueryableStateProcessFunction, RowKeySelector} +import org.apache.flink.types.Row +import org.junit.Assert.assertEquals +import org.junit.Test + +class QueryableTableSinkTest extends HarnessTestBase { + @Test + def testRowSelector(): Unit = { +val keyTypes = Array(TypeInformation.of(classOf[List[Int]]), TypeInformation.of(classOf[String])) +val selector = new RowKeySelector(Array(0, 2), new RowTypeInfo(keyTypes:_*)) + +val src = Row.of(List(1), "a", "b") +val key = selector.getKey(JTuple2.of(true, src)) + +assertEquals(Row.of(List(1), "b"), key) + } + + @Test + def testProcessFunction(): Unit = { +val queryConfig = new StreamQueryConfig() + .withIdleStateRetentionTime(Time.milliseconds(2), Time.milliseconds(10)) + +val keys = Array("id") +val keyType = new RowTypeInfo(TypeInformation.of(classOf[String])) +val fieldNames = Array("id", "is_manager", "name") +val fieldTypes: Array[TypeInformation[_]] = Array( + TypeInformation.of(classOf[String]).asInstanceOf[TypeInformation[_]], + TypeInformation.of(classOf[JBool]).asInstanceOf[TypeInformation[_]], + TypeInformation.of(classOf[String]).asInstanceOf[TypeInformation[_]]) +val func = new QueryableStateProcessFunction("test", queryConfig, keys, fieldNames, fieldTypes) + +val operator = new LegacyKeyedProcessOperator[Row, JTuple2[JBool, Row], Void](func) --- End diff -- Same as above. ---
[GitHub] flink pull request #5688: [FLINK-6968][Table API & SQL] Add Queryable table ...
Github user liurenjie1024 commented on a diff in the pull request: https://github.com/apache/flink/pull/5688#discussion_r174366242 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/QueryableTableSink.scala --- @@ -0,0 +1,175 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.sinks + +import java.lang.{Boolean => JBool} + +import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor} +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.functions.KeySelector +import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2} +import org.apache.flink.api.java.typeutils.{ResultTypeQueryable, RowTypeInfo} +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.datastream.DataStream +import org.apache.flink.streaming.api.functions.ProcessFunction +import org.apache.flink.table.api.StreamQueryConfig +import org.apache.flink.table.runtime.aggregate.ProcessFunctionWithCleanupState +import org.apache.flink.types.Row +import org.apache.flink.util.Collector +import org.slf4j.LoggerFactory + +class QueryableTableSink(private val namePrefix: String, + private val queryConfig: StreamQueryConfig) + extends UpsertStreamTableSink[Row] + with TableSinkBase[JTuple2[JBool, Row]] { + private var keys: Array[String] = _ + + /** +* Configures the unique key fields of the [[Table]] to write. +* The method is called after [[TableSink.configure()]]. +* +* The keys array might be empty, if the table consists of a single (updated) record. +* If the table does not have a key and is append-only, the keys attribute is null. +* +* @param keys the field names of the table's keys, an empty array if the table has a single +* row, and null if the table is append-only and has no key. +*/ + override def setKeyFields(keys: Array[String]): Unit = { +if (keys == null) { + throw new IllegalArgumentException("keys can't be null!") +} +this.keys = keys + } + + /** +* Specifies whether the [[Table]] to write is append-only or not. +* +* @param isAppendOnly true if the table is append-only, false otherwise. +*/ + override def setIsAppendOnly(isAppendOnly: JBool): Unit = { +if (isAppendOnly) { + throw new IllegalArgumentException("A QueryableTableSink can not be used with append-only tables " + +"as the table would grow infinitely") +} + } + + /** Returns the requested record type */ + override def getRecordType: TypeInformation[Row] = new RowTypeInfo(getFieldTypes, getFieldNames) + + /** Emits the DataStream. */ + override def emitDataStream(dataStream: DataStream[JTuple2[JBool, Row]]): Unit = { +val keyIndices = keys.map(getFieldNames.indexOf(_)) +val keyTypes = keyIndices.map(getFieldTypes(_)) + +val keySelectorType = new RowTypeInfo(keyTypes, keys) + +val processFunction = new QueryableStateProcessFunction( + namePrefix, + queryConfig, + keys, + getFieldNames, + getFieldTypes) + +dataStream.keyBy(new RowKeySelector(keyIndices, keySelectorType)) + .process(processFunction) --- End diff -- I'm waiting another [PR](https://github.com/apache/flink/pull/5680) to be merged. ---
[GitHub] flink issue #5680: [FLINK-8919] Add KeyedProcessFunctionWithCleanupState.
Github user liurenjie1024 commented on the issue: https://github.com/apache/flink/pull/5680 @bowenli86 @fhueske UT added. This class is a utility class which can be the base class of many process function implementations in flink table. Its counterpart for the legacy `ProcessFunction `interface, `ProcessFunctionWithCleanupState`, has been inherited by many implementations. In fact, my other [PR](https://github.com/apache/flink/pull/5688) depends on this. ---
[GitHub] flink issue #5688: [FLINK-6968][Table API & SQL] Add Queryable table sink.
Github user liurenjie1024 commented on the issue: https://github.com/apache/flink/pull/5688 @twalthr Please help to review this. ---
[GitHub] flink pull request #5688: [FLINK-6968][Table API & SQL] Add Queryable table ...
GitHub user liurenjie1024 opened a pull request: https://github.com/apache/flink/pull/5688 [FLINK-6968][Table API & SQL] Add Queryable table sink. ## What is the purpose of the change Streaming tables with unique key are continuously updated. For example queries with a non-windowed aggregation generate such tables. Commonly, such updating tables are emitted via an upsert table sink to an external datastore (k-v store, database) to make it accessible to applications. ## Brief change log - *Add a QueryableStateTableSink.* - *States are queryable.* ## Verifying this change This change added tests and can be verified as follows: - *Added test that validates that states will be stored.* - *Manually verified the change by running a 4 node cluser with 2 JobManagers and 4 TaskManagers, a stateful streaming program, and use QueryableStateClient to test that.* ## Documentation - Does this pull request introduce a new feature? yes - If yes, how is the feature documented? docs You can merge this pull request into a Git repository by running: $ git pull https://github.com/liurenjie1024/flink QueryableTableSink Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5688.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5688 commit 0663e550216dfef8bf205f90d5ac8a0e7e77a42b Author: liurenjie1024 Date: 2018-03-12T07:32:44Z Code complete commit 6d62d53f0bae65249ab69bddf7932e62ae1e7897 Author: liurenjie1024 Date: 2018-03-13T09:44:30Z Add test commit c9ffa6ecdd638a497b60f3f063b2d352b1b98059 Author: liurenjie1024 Date: 2018-03-13T10:43:19Z Fix test style ---
[GitHub] flink issue #5680: [FLINK-8919] Add KeyedProcessFunctionWithCleanupState.
Github user liurenjie1024 commented on the issue: https://github.com/apache/flink/pull/5680 Can anyone help to merge this? ---
[GitHub] flink issue #5680: [FLINK-8919] Add KeyedProcessFunctionWithCleanupState.
Github user liurenjie1024 commented on the issue: https://github.com/apache/flink/pull/5680 @bowenli86 This is a trivial change and most the code is copied from the non keyed counterpart, so I don't think we need a test. ---
[GitHub] flink pull request #5680: [FLINK-8919] Add KeyedProcessFunctionWithCleanupSt...
GitHub user liurenjie1024 opened a pull request: https://github.com/apache/flink/pull/5680 [FLINK-8919] Add KeyedProcessFunctionWithCleanupState. ## What is the purpose of the change *Add ProcessFunctionWithCleanupState's counterpart for KeyedProcessFunction.* ## Verifying this change This change is a trivial rework / code cleanup without any test coverage. You can merge this pull request into a Git repository by running: $ git pull https://github.com/liurenjie1024/flink KeyedProcessFunctionWithCleanupState Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5680.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5680 commit 26915427ba2f3c8e131cbd788c7e4967e69ae2c0 Author: liurenjie1024 Date: 2018-03-12T07:43:26Z KeyedProcessFunctionWithCleanupState ---