[jira] [Commented] (FLINK-6968) Store streaming, updating tables with unique key in queryable state
[ https://issues.apache.org/jira/browse/FLINK-6968?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16581093#comment-16581093 ] ASF GitHub Bot commented on FLINK-6968: --- alpinegizmo commented on issue #6434: [FLINK-6968] [table] Add Queryable table sink. URL: https://github.com/apache/flink/pull/6434#issuecomment-413203702 The documentation is pretty thin. It would be nice to explain that the queryable table sink can only be used in retract mode, and include a better example. I only understood what's going on after reading the test -- I found Row.of("jeff") much easier to understand than Row.of(1). This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Store streaming, updating tables with unique key in queryable state > --- > > Key: FLINK-6968 > URL: https://issues.apache.org/jira/browse/FLINK-6968 > Project: Flink > Issue Type: New Feature > Components: Table API SQL >Reporter: Fabian Hueske >Assignee: Renjie Liu >Priority: Major > Labels: pull-request-available > > Streaming tables with unique key are continuously updated. For example > queries with a non-windowed aggregation generate such tables. Commonly, such > updating tables are emitted via an upsert table sink to an external datastore > (k-v store, database) to make it accessible to applications. > This issue is about adding a feature to store and maintain such a table as > queryable state in Flink. By storing the table in Flnk's queryable state, we > do not need an external data store to access the results of the query but can > query the results directly from Flink. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-6968) Store streaming, updating tables with unique key in queryable state
[ https://issues.apache.org/jira/browse/FLINK-6968?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16576527#comment-16576527 ] ASF GitHub Bot commented on FLINK-6968: --- xhumanoid commented on a change in pull request #6434: [FLINK-6968] [table] Add Queryable table sink. URL: https://github.com/apache/flink/pull/6434#discussion_r209314950 ## File path: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sink/queryable/QueryableTableSinkTest.scala ## @@ -0,0 +1,188 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.stream.sink.queryable + +import java.time.Duration +import java.util.concurrent.{ExecutionException, TimeUnit} + +import org.apache.flink.api.common.state.ValueStateDescriptor +import org.apache.flink.api.common.time.{Deadline, Time} +import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} +import org.apache.flink.api.java.typeutils.RowTypeInfo +import org.apache.flink.api.scala._ +import org.apache.flink.contrib.streaming.state.RocksDBStateBackend +import org.apache.flink.queryablestate.client.QueryableStateClient +import org.apache.flink.queryablestate.exceptions.UnknownKeyOrNamespaceException +import org.apache.flink.runtime.state.StateBackend +import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment +import org.apache.flink.table.api.scala._ +import org.apache.flink.table.api.{StreamQueryConfig, TableEnvironment} +import org.apache.flink.table.runtime.harness.HarnessTestBase.TestStreamQueryConfig +import org.apache.flink.table.sinks.queryable.QueryableTableSink +import org.apache.flink.types.Row +import org.hamcrest.core.Is +import org.junit.Assert._ +import org.junit.rules.{ExpectedException, TemporaryFolder} +import org.junit.{Rule, Test} + + +class QueryableTableSinkTest extends QueryableSinkTestBase { + + private val queryConfig = new StreamQueryConfig() + queryConfig.withIdleStateRetentionTime(Time.hours(1), Time.hours(2)) + + val _tempFolder = new TemporaryFolder + @Rule + def tempFolder: TemporaryFolder = _tempFolder + + val _expectedException = ExpectedException.none() + @Rule + def expectedException: ExpectedException = _expectedException + + def getStateBackend: StateBackend = { +val dbPath = tempFolder.newFolder().getAbsolutePath +val checkpointPath = tempFolder.newFolder().toURI.toString +val backend = new RocksDBStateBackend(checkpointPath) +backend.setDbStoragePath(dbPath) +backend + } + + @Test + def testQueryableSink(): Unit = { +val env = StreamExecutionEnvironment.getExecutionEnvironment +env.setStateBackend(getStateBackend) +val tEnv = TableEnvironment.getTableEnvironment(env) + +//name, money +val data = List(("jeff", -1), ("dean", -2), ("jeff", 2), ("dean", 4)) +val source = new TestKVListSource[String, Int](data) + +// select name, sum(money) as sm from t group by name +val t = env.addSource(source).toTable(tEnv, 'name, 'money) +.groupBy("name") +.select("name, sum(money) as sm") + +val queryableSink = new QueryableTableSink("prefix", + new StreamQueryConfig().withIdleStateRetentionTime(Time.minutes(1), Time.minutes(7))) + +t.writeToSink(queryableSink) + +val clusterClient = QueryableSinkTestBase.miniClusterResource.getClusterClient +val deadline = Deadline.now.plus(Duration.ofSeconds(100)) + +val autoCancellableJob = new AutoCancellableJob(deadline, clusterClient, env.getJavaEnv) +val client = new QueryableStateClient("localhost", 9084) + Review comment: Required shutdown client in the end This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Store streaming, updating tables with unique key in queryable state > --- > > Key: FLINK-6968 >
[jira] [Commented] (FLINK-6968) Store streaming, updating tables with unique key in queryable state
[ https://issues.apache.org/jira/browse/FLINK-6968?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16576452#comment-16576452 ] ASF GitHub Bot commented on FLINK-6968: --- xhumanoid commented on a change in pull request #6434: [FLINK-6968] [table] Add Queryable table sink. URL: https://github.com/apache/flink/pull/6434#discussion_r209300017 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/queryable/QueryableTableSink.scala ## @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.sinks.queryable + +import java.lang.{Boolean => JBool} + +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2} +import org.apache.flink.api.java.typeutils.RowTypeInfo +import org.apache.flink.streaming.api.datastream.DataStream +import org.apache.flink.table.api.StreamQueryConfig +import org.apache.flink.table.sinks.{TableSinkBase, UpsertStreamTableSink} +import org.apache.flink.types.Row + +/** + * A QueryableTableSink stores table in queryable state. + * + * This class stores table in queryable state so that users can access table data without + * dependency on external storage. + * + * Since this is only a kv storage, currently user can only do point query against it. + * + * Example: + * {{{ + * val env = ExecutionEnvironment.getExecutionEnvironment + * val tEnv = TableEnvironment.getTableEnvironment(env) + * + * val table: Table = ... + * + * val queryableTableSink: QueryableTableSink = new QueryableTableSink( + * "prefix", + * queryConfig) + * + * tEnv.writeToSink(table, queryableTableSink, config) + * }}} + * + * When program starts to run, user can access state with QueryableStateClient. + * {{{ + * val client = new QueryableStateClient(tmHostname, proxyPort) + * val data = client.getKvState( + * jobId, + * "prefix-column1", + * Row.of(1), + * new RowTypeInfo(Array(TypeInfoformation.of(classOf[Int]), Array("id")) + * stateDescriptor) + * .get(); + * + * }}} + * + * + * @param namePrefix + * @param queryConfig + */ +class QueryableTableSink( + private val namePrefix: String, + private val queryConfig: StreamQueryConfig) + extends UpsertStreamTableSink[Row] +with TableSinkBase[JTuple2[JBool, Row]] { + private var keys: Array[String] = _ + + override def setKeyFields(keys: Array[String]): Unit = { +if (keys == null) { + throw new IllegalArgumentException("keys can't be null!") +} +this.keys = keys + } + + override def setIsAppendOnly(isAppendOnly: JBool): Unit = { +if (isAppendOnly) { + throw new IllegalArgumentException("A QueryableTableSink can not be used with append-only " + +"tables as the table would grow infinitely") +} + } + + override def getRecordType: TypeInformation[Row] = new RowTypeInfo(getFieldTypes, getFieldNames) + + override def emitDataStream(dataStream: DataStream[JTuple2[JBool, Row]]): Unit = { +val keyIndices = keys.map(getFieldNames.indexOf(_)) +val keyTypes = keyIndices.map(getFieldTypes(_)) + +val keySelectorType = new RowTypeInfo(keyTypes, keys) + +val processFunction = new QueryableStateProcessFunction( + namePrefix, + queryConfig, + keys, + getFieldNames, + getFieldTypes) + +dataStream.keyBy(new RowKeySelector(keyIndices, keySelectorType)) + .process(processFunction) + } + + override protected def copy: TableSinkBase[JTuple2[JBool, Row]] = { +new QueryableTableSink(this.namePrefix, this.queryConfig) + } +} + + + + Review comment: remove block of blank lines This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Store streaming, updating tables with unique key in queryable state >
[jira] [Commented] (FLINK-6968) Store streaming, updating tables with unique key in queryable state
[ https://issues.apache.org/jira/browse/FLINK-6968?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16576440#comment-16576440 ] ASF GitHub Bot commented on FLINK-6968: --- xhumanoid commented on a change in pull request #6434: [FLINK-6968] [table] Add Queryable table sink. URL: https://github.com/apache/flink/pull/6434#discussion_r209297911 ## File path: flink-libraries/flink-table/pom.xml ## @@ -186,6 +186,13 @@ under the License. test test-jar + + + org.apache.flink + flink-queryable-state-runtime_2.11 Review comment: replace hardcoded 2.11 with ${scala.binary.version} This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Store streaming, updating tables with unique key in queryable state > --- > > Key: FLINK-6968 > URL: https://issues.apache.org/jira/browse/FLINK-6968 > Project: Flink > Issue Type: New Feature > Components: Table API SQL >Reporter: Fabian Hueske >Assignee: Renjie Liu >Priority: Major > Labels: pull-request-available > > Streaming tables with unique key are continuously updated. For example > queries with a non-windowed aggregation generate such tables. Commonly, such > updating tables are emitted via an upsert table sink to an external datastore > (k-v store, database) to make it accessible to applications. > This issue is about adding a feature to store and maintain such a table as > queryable state in Flink. By storing the table in Flnk's queryable state, we > do not need an external data store to access the results of the query but can > query the results directly from Flink. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-6968) Store streaming, updating tables with unique key in queryable state
[ https://issues.apache.org/jira/browse/FLINK-6968?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16559159#comment-16559159 ] ASF GitHub Bot commented on FLINK-6968: --- liurenjie1024 commented on issue #5688: [FLINK-6968][Table API & SQL] Add Queryable table sink. URL: https://github.com/apache/flink/pull/5688#issuecomment-408291783 I'm sorry for messing this up with a wrong rebase. I've opened a cleaner PR https://github.com/apache/flink/pull/6434 for the same issue. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Store streaming, updating tables with unique key in queryable state > --- > > Key: FLINK-6968 > URL: https://issues.apache.org/jira/browse/FLINK-6968 > Project: Flink > Issue Type: New Feature > Components: Table API SQL >Reporter: Fabian Hueske >Assignee: Renjie Liu >Priority: Major > Labels: pull-request-available > > Streaming tables with unique key are continuously updated. For example > queries with a non-windowed aggregation generate such tables. Commonly, such > updating tables are emitted via an upsert table sink to an external datastore > (k-v store, database) to make it accessible to applications. > This issue is about adding a feature to store and maintain such a table as > queryable state in Flink. By storing the table in Flnk's queryable state, we > do not need an external data store to access the results of the query but can > query the results directly from Flink. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-6968) Store streaming, updating tables with unique key in queryable state
[ https://issues.apache.org/jira/browse/FLINK-6968?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16559156#comment-16559156 ] ASF GitHub Bot commented on FLINK-6968: --- liurenjie1024 commented on issue #6434: [FLINK-6968][Table API & SQL] Add Queryable table sink. URL: https://github.com/apache/flink/pull/6434#issuecomment-408291551 I'm sorry for messing up https://github.com/apache/flink/pull/5688 with a wrong rebase and opened this new PR. @twalthr Please help to review this? I've fixed you comments in https://github.com/apache/flink/pull/5688 and added integration test. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Store streaming, updating tables with unique key in queryable state > --- > > Key: FLINK-6968 > URL: https://issues.apache.org/jira/browse/FLINK-6968 > Project: Flink > Issue Type: New Feature > Components: Table API SQL >Reporter: Fabian Hueske >Assignee: Renjie Liu >Priority: Major > Labels: pull-request-available > > Streaming tables with unique key are continuously updated. For example > queries with a non-windowed aggregation generate such tables. Commonly, such > updating tables are emitted via an upsert table sink to an external datastore > (k-v store, database) to make it accessible to applications. > This issue is about adding a feature to store and maintain such a table as > queryable state in Flink. By storing the table in Flnk's queryable state, we > do not need an external data store to access the results of the query but can > query the results directly from Flink. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-6968) Store streaming, updating tables with unique key in queryable state
[ https://issues.apache.org/jira/browse/FLINK-6968?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16559155#comment-16559155 ] ASF GitHub Bot commented on FLINK-6968: --- liurenjie1024 opened a new pull request #6434: [FLINK-6968][Table API & SQL] Add Queryable table sink. URL: https://github.com/apache/flink/pull/6434 ## What is the purpose of the change Streaming tables with unique key are continuously updated. For example queries with a non-windowed aggregation generate such tables. Commonly, such updating tables are emitted via an upsert table sink to an external datastore (k-v store, database) to make it accessible to applications. ## Brief change log - *Add a QueryableStateTableSink.* - *States are queryable.* ## Verifying this change This change added tests and can be verified as follows: - *Added test that validates that states will be stored.* ## Documentation - Does this pull request introduce a new feature? yes - If yes, how is the feature documented? docs This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Store streaming, updating tables with unique key in queryable state > --- > > Key: FLINK-6968 > URL: https://issues.apache.org/jira/browse/FLINK-6968 > Project: Flink > Issue Type: New Feature > Components: Table API SQL >Reporter: Fabian Hueske >Assignee: Renjie Liu >Priority: Major > Labels: pull-request-available > > Streaming tables with unique key are continuously updated. For example > queries with a non-windowed aggregation generate such tables. Commonly, such > updating tables are emitted via an upsert table sink to an external datastore > (k-v store, database) to make it accessible to applications. > This issue is about adding a feature to store and maintain such a table as > queryable state in Flink. By storing the table in Flnk's queryable state, we > do not need an external data store to access the results of the query but can > query the results directly from Flink. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-6968) Store streaming, updating tables with unique key in queryable state
[ https://issues.apache.org/jira/browse/FLINK-6968?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16490567#comment-16490567 ] ASF GitHub Bot commented on FLINK-6968: --- Github user liurenjie1024 commented on the issue: https://github.com/apache/flink/pull/5688 @twalthr I store each column separately to avoid encoding and schema management problems, also to reduce unnecessary data transfer when querying data. Issue multiple requests for multiple fields is one of the drawbacks. > Store streaming, updating tables with unique key in queryable state > --- > > Key: FLINK-6968 > URL: https://issues.apache.org/jira/browse/FLINK-6968 > Project: Flink > Issue Type: New Feature > Components: Table API SQL >Reporter: Fabian Hueske >Assignee: Renjie Liu >Priority: Major > > Streaming tables with unique key are continuously updated. For example > queries with a non-windowed aggregation generate such tables. Commonly, such > updating tables are emitted via an upsert table sink to an external datastore > (k-v store, database) to make it accessible to applications. > This issue is about adding a feature to store and maintain such a table as > queryable state in Flink. By storing the table in Flnk's queryable state, we > do not need an external data store to access the results of the query but can > query the results directly from Flink. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-6968) Store streaming, updating tables with unique key in queryable state
[ https://issues.apache.org/jira/browse/FLINK-6968?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16490564#comment-16490564 ] ASF GitHub Bot commented on FLINK-6968: --- Github user liurenjie1024 commented on a diff in the pull request: https://github.com/apache/flink/pull/5688#discussion_r190859490 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/KeyedProcessFunctionWithCleanupState.scala --- @@ -44,8 +45,20 @@ abstract class KeyedProcessFunctionWithCleanupState[K, I, O](queryConfig: Stream protected def registerProcessingCleanupTimer( ctx: KeyedProcessFunction[K, I, O]#Context, currentTime: Long): Unit = { -if (stateCleaningEnabled) { +registerCleanupTimer(ctx, currentTime, TimeDomain.PROCESSING_TIME) + } + protected def registerEventCleanupTimer( --- End diff -- The reason why I put it in the same PR is that I don't want it to block this PR, but I also agree that we should move it to a separate one. > Store streaming, updating tables with unique key in queryable state > --- > > Key: FLINK-6968 > URL: https://issues.apache.org/jira/browse/FLINK-6968 > Project: Flink > Issue Type: New Feature > Components: Table API SQL >Reporter: Fabian Hueske >Assignee: Renjie Liu >Priority: Major > > Streaming tables with unique key are continuously updated. For example > queries with a non-windowed aggregation generate such tables. Commonly, such > updating tables are emitted via an upsert table sink to an external datastore > (k-v store, database) to make it accessible to applications. > This issue is about adding a feature to store and maintain such a table as > queryable state in Flink. By storing the table in Flnk's queryable state, we > do not need an external data store to access the results of the query but can > query the results directly from Flink. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-6968) Store streaming, updating tables with unique key in queryable state
[ https://issues.apache.org/jira/browse/FLINK-6968?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16486957#comment-16486957 ] ASF GitHub Bot commented on FLINK-6968: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5688#discussion_r190176842 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/KeyedProcessFunctionWithCleanupState.scala --- @@ -44,8 +45,20 @@ abstract class KeyedProcessFunctionWithCleanupState[K, I, O](queryConfig: Stream protected def registerProcessingCleanupTimer( ctx: KeyedProcessFunction[K, I, O]#Context, currentTime: Long): Unit = { -if (stateCleaningEnabled) { +registerCleanupTimer(ctx, currentTime, TimeDomain.PROCESSING_TIME) + } + protected def registerEventCleanupTimer( --- End diff -- We implemented state cleanup as processing time because it is easier to reason about for users and doesn't interfere that much with event-time processing (it is not possible to distinguish timers yet). However, it also has a few short comings such as cleared state when recovering a query from a savepoint (which we don't really encourage at the moment). Anyway, introducing event-time state cleanup should definitely go into a separate issue and PR. > Store streaming, updating tables with unique key in queryable state > --- > > Key: FLINK-6968 > URL: https://issues.apache.org/jira/browse/FLINK-6968 > Project: Flink > Issue Type: New Feature > Components: Table API SQL >Reporter: Fabian Hueske >Assignee: Renjie Liu >Priority: Major > > Streaming tables with unique key are continuously updated. For example > queries with a non-windowed aggregation generate such tables. Commonly, such > updating tables are emitted via an upsert table sink to an external datastore > (k-v store, database) to make it accessible to applications. > This issue is about adding a feature to store and maintain such a table as > queryable state in Flink. By storing the table in Flnk's queryable state, we > do not need an external data store to access the results of the query but can > query the results directly from Flink. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-6968) Store streaming, updating tables with unique key in queryable state
[ https://issues.apache.org/jira/browse/FLINK-6968?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16486823#comment-16486823 ] ASF GitHub Bot commented on FLINK-6968: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/5688#discussion_r190136859 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/QueryableTableSink.scala --- @@ -0,0 +1,223 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.sinks + +import java.lang.{Boolean => JBool} + +import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor} +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.functions.KeySelector +import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2} +import org.apache.flink.api.java.typeutils.{ResultTypeQueryable, RowTypeInfo} +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.datastream.DataStream +import org.apache.flink.streaming.api.functions.KeyedProcessFunction +import org.apache.flink.streaming.api.{TimeCharacteristic, TimeDomain} +import org.apache.flink.table.api.StreamQueryConfig +import org.apache.flink.table.runtime.aggregate.KeyedProcessFunctionWithCleanupState +import org.apache.flink.types.Row +import org.apache.flink.util.Collector + +/** + * A QueryableTableSink stores table in queryable state. + * + * This class stores table in queryable state so that users can access table data without + * dependency on external storage. + * + * Since this is only a kv storage, currently user can only do point query against it. + * + * Example: + * {{{ + * val env = ExecutionEnvironment.getExecutionEnvironment + * val tEnv = TableEnvironment.getTableEnvironment(env) + * + * val table: Table = ... + * + * val queryableTableSink: QueryableTableSink = new QueryableTableSink( + * "prefix", + * queryConfig, + * None) + * + * tEnv.writeToSink(table, queryableTableSink, config) + * }}} + * + * When program starts to run, user can access state with QueryableStateClient. + * {{{ + * val client = new QueryableStateClient(tmHostname, proxyPort) + * val data = client.getKvState( + * jobId, + * "prefix-column1", + * Row.of(1), + * new RowTypeInfo(Array(TypeInfoformation.of(classOf[Int]), Array("id")) + * stateDescriptor) + * .get(); + * + * }}} + * + * + * @param namePrefix + * @param queryConfig + * @param cleanupTimeDomain + */ +class QueryableTableSink( +private val namePrefix: String, +private val queryConfig: StreamQueryConfig, +private val cleanupTimeDomain: Option[TimeDomain]) + extends UpsertStreamTableSink[Row] +with TableSinkBase[JTuple2[JBool, Row]] { + private var keys: Array[String] = _ + + override def setKeyFields(keys: Array[String]): Unit = { +if (keys == null) { + throw new IllegalArgumentException("keys can't be null!") +} +this.keys = keys + } + + override def setIsAppendOnly(isAppendOnly: JBool): Unit = { +if (isAppendOnly) { + throw new IllegalArgumentException("A QueryableTableSink can not be used with append-only " + +"tables as the table would grow infinitely") +} + } + + override def getRecordType: TypeInformation[Row] = new RowTypeInfo(getFieldTypes, getFieldNames) + + override def emitDataStream(dataStream: DataStream[JTuple2[JBool, Row]]): Unit = { +val keyIndices = keys.map(getFieldNames.indexOf(_)) +val keyTypes = keyIndices.map(getFieldTypes(_)) + +val keySelectorType = new RowTypeInfo(keyTypes, keys) + +val processFunction = new
[jira] [Commented] (FLINK-6968) Store streaming, updating tables with unique key in queryable state
[ https://issues.apache.org/jira/browse/FLINK-6968?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16486829#comment-16486829 ] ASF GitHub Bot commented on FLINK-6968: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/5688#discussion_r190141550 --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java --- @@ -78,6 +82,20 @@ public int numKeyedStateEntries() { } } + public S getState(K key, StateDescriptorstateDesc) throws Exception { --- End diff -- Is this change necessary? We should only modify code outside of `flink-table` if it is urgently needed. > Store streaming, updating tables with unique key in queryable state > --- > > Key: FLINK-6968 > URL: https://issues.apache.org/jira/browse/FLINK-6968 > Project: Flink > Issue Type: New Feature > Components: Table API SQL >Reporter: Fabian Hueske >Assignee: Renjie Liu >Priority: Major > > Streaming tables with unique key are continuously updated. For example > queries with a non-windowed aggregation generate such tables. Commonly, such > updating tables are emitted via an upsert table sink to an external datastore > (k-v store, database) to make it accessible to applications. > This issue is about adding a feature to store and maintain such a table as > queryable state in Flink. By storing the table in Flnk's queryable state, we > do not need an external data store to access the results of the query but can > query the results directly from Flink. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-6968) Store streaming, updating tables with unique key in queryable state
[ https://issues.apache.org/jira/browse/FLINK-6968?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16486824#comment-16486824 ] ASF GitHub Bot commented on FLINK-6968: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/5688#discussion_r190137151 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/QueryableTableSink.scala --- @@ -0,0 +1,223 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.sinks + +import java.lang.{Boolean => JBool} + +import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor} +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.functions.KeySelector +import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2} +import org.apache.flink.api.java.typeutils.{ResultTypeQueryable, RowTypeInfo} +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.datastream.DataStream +import org.apache.flink.streaming.api.functions.KeyedProcessFunction +import org.apache.flink.streaming.api.{TimeCharacteristic, TimeDomain} +import org.apache.flink.table.api.StreamQueryConfig +import org.apache.flink.table.runtime.aggregate.KeyedProcessFunctionWithCleanupState +import org.apache.flink.types.Row +import org.apache.flink.util.Collector + +/** + * A QueryableTableSink stores table in queryable state. + * + * This class stores table in queryable state so that users can access table data without + * dependency on external storage. + * + * Since this is only a kv storage, currently user can only do point query against it. + * + * Example: + * {{{ + * val env = ExecutionEnvironment.getExecutionEnvironment + * val tEnv = TableEnvironment.getTableEnvironment(env) + * + * val table: Table = ... + * + * val queryableTableSink: QueryableTableSink = new QueryableTableSink( + * "prefix", + * queryConfig, + * None) + * + * tEnv.writeToSink(table, queryableTableSink, config) + * }}} + * + * When program starts to run, user can access state with QueryableStateClient. + * {{{ + * val client = new QueryableStateClient(tmHostname, proxyPort) + * val data = client.getKvState( + * jobId, + * "prefix-column1", + * Row.of(1), + * new RowTypeInfo(Array(TypeInfoformation.of(classOf[Int]), Array("id")) + * stateDescriptor) + * .get(); + * + * }}} + * + * + * @param namePrefix + * @param queryConfig + * @param cleanupTimeDomain + */ +class QueryableTableSink( +private val namePrefix: String, +private val queryConfig: StreamQueryConfig, +private val cleanupTimeDomain: Option[TimeDomain]) + extends UpsertStreamTableSink[Row] +with TableSinkBase[JTuple2[JBool, Row]] { + private var keys: Array[String] = _ + + override def setKeyFields(keys: Array[String]): Unit = { +if (keys == null) { + throw new IllegalArgumentException("keys can't be null!") +} +this.keys = keys + } + + override def setIsAppendOnly(isAppendOnly: JBool): Unit = { +if (isAppendOnly) { + throw new IllegalArgumentException("A QueryableTableSink can not be used with append-only " + +"tables as the table would grow infinitely") +} + } + + override def getRecordType: TypeInformation[Row] = new RowTypeInfo(getFieldTypes, getFieldNames) + + override def emitDataStream(dataStream: DataStream[JTuple2[JBool, Row]]): Unit = { +val keyIndices = keys.map(getFieldNames.indexOf(_)) +val keyTypes = keyIndices.map(getFieldTypes(_)) + +val keySelectorType = new RowTypeInfo(keyTypes, keys) + +val processFunction = new
[jira] [Commented] (FLINK-6968) Store streaming, updating tables with unique key in queryable state
[ https://issues.apache.org/jira/browse/FLINK-6968?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16486830#comment-16486830 ] ASF GitHub Bot commented on FLINK-6968: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/5688#discussion_r190141671 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/QueryableTableSinkTest.scala --- @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.stream.table + +import java.lang.{Boolean => JBool} + +import org.apache.flink.api.common.state.ValueStateDescriptor +import org.apache.flink.api.common.time.Time +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2} +import org.apache.flink.api.java.typeutils.RowTypeInfo +import org.apache.flink.streaming.api.TimeDomain +import org.apache.flink.streaming.api.operators.KeyedProcessOperator +import org.apache.flink.table.api.StreamQueryConfig +import org.apache.flink.table.runtime.harness.HarnessTestBase +import org.apache.flink.table.sinks.{QueryableStateProcessFunction, RowKeySelector} +import org.apache.flink.types.Row +import org.junit.Assert.assertEquals +import org.junit.Test + +class QueryableTableSinkTest extends HarnessTestBase { + @Test + def testRowSelector(): Unit = { +val keyTypes = Array(TypeInformation.of(classOf[List[Int]]), + TypeInformation.of(classOf[String])) +val selector = new RowKeySelector(Array(0, 2), new RowTypeInfo(keyTypes:_*)) + +val src = Row.of(List(1), "a", "b") +val key = selector.getKey(JTuple2.of(true, src)) + +assertEquals(Row.of(List(1), "b"), key) + } + + @Test + def testProcessFunction(): Unit = { +val queryConfig = new StreamQueryConfig() + .withIdleStateRetentionTime(Time.milliseconds(2), Time.milliseconds(10)) + +val keys = Array("id") +val keyType = new RowTypeInfo(TypeInformation.of(classOf[String])) +val fieldNames = Array("id", "is_manager", "name") +val fieldTypes: Array[TypeInformation[_]] = Array( + TypeInformation.of(classOf[String]).asInstanceOf[TypeInformation[_]], + TypeInformation.of(classOf[JBool]).asInstanceOf[TypeInformation[_]], + TypeInformation.of(classOf[String]).asInstanceOf[TypeInformation[_]]) +val func = new QueryableStateProcessFunction( + "test", + queryConfig, + keys, + fieldNames, + fieldTypes, + TimeDomain.PROCESSING_TIME) + +val operator = new KeyedProcessOperator[Row, JTuple2[JBool, Row], Void](func) + +val testHarness = createHarnessTester(operator, + new RowKeySelector(Array(0), keyType), + keyType) + +testHarness.open() + +val stateDesc1 = new ValueStateDescriptor[JBool]("is_manager", + TypeInformation.of(classOf[JBool])) +stateDesc1.initializeSerializerUnlessSet(operator.getExecutionConfig) +val stateDesc2 = new ValueStateDescriptor[String]("name", TypeInformation.of(classOf[String])) +stateDesc2.initializeSerializerUnlessSet(operator.getExecutionConfig) +val key1 = Row.of("1") +val key2 = Row.of("2") + +testHarness.processElement(JTuple2.of(true, Row.of("1", JBool.valueOf(true), "jeff")), 2) +testHarness.processElement(JTuple2.of(true, Row.of("2", JBool.valueOf(false), "dean")), 6) + +val stateOf = (key: Row, sd: ValueStateDescriptor[_]) => { + testHarness.getState(key, sd).value().asInstanceOf[AnyRef] +} + +var expectedData = Array( + Row.of(JBool.valueOf(true), "jeff"), + Row.of(JBool.valueOf(false), "dean")) +var storedData = Array( + Row.of(stateOf(key1, stateDesc1), stateOf(key1, stateDesc2)), +
[jira] [Commented] (FLINK-6968) Store streaming, updating tables with unique key in queryable state
[ https://issues.apache.org/jira/browse/FLINK-6968?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16486831#comment-16486831 ] ASF GitHub Bot commented on FLINK-6968: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/5688#discussion_r190140977 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/QueryableTableSink.scala --- @@ -0,0 +1,223 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.sinks + +import java.lang.{Boolean => JBool} + +import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor} +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.functions.KeySelector +import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2} +import org.apache.flink.api.java.typeutils.{ResultTypeQueryable, RowTypeInfo} +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.datastream.DataStream +import org.apache.flink.streaming.api.functions.KeyedProcessFunction +import org.apache.flink.streaming.api.{TimeCharacteristic, TimeDomain} +import org.apache.flink.table.api.StreamQueryConfig +import org.apache.flink.table.runtime.aggregate.KeyedProcessFunctionWithCleanupState +import org.apache.flink.types.Row +import org.apache.flink.util.Collector + +/** + * A QueryableTableSink stores table in queryable state. + * + * This class stores table in queryable state so that users can access table data without + * dependency on external storage. + * + * Since this is only a kv storage, currently user can only do point query against it. + * + * Example: + * {{{ + * val env = ExecutionEnvironment.getExecutionEnvironment + * val tEnv = TableEnvironment.getTableEnvironment(env) + * + * val table: Table = ... + * + * val queryableTableSink: QueryableTableSink = new QueryableTableSink( + * "prefix", + * queryConfig, + * None) + * + * tEnv.writeToSink(table, queryableTableSink, config) + * }}} + * + * When program starts to run, user can access state with QueryableStateClient. + * {{{ + * val client = new QueryableStateClient(tmHostname, proxyPort) + * val data = client.getKvState( + * jobId, + * "prefix-column1", + * Row.of(1), + * new RowTypeInfo(Array(TypeInfoformation.of(classOf[Int]), Array("id")) + * stateDescriptor) + * .get(); + * + * }}} + * + * + * @param namePrefix + * @param queryConfig + * @param cleanupTimeDomain + */ +class QueryableTableSink( +private val namePrefix: String, +private val queryConfig: StreamQueryConfig, +private val cleanupTimeDomain: Option[TimeDomain]) + extends UpsertStreamTableSink[Row] +with TableSinkBase[JTuple2[JBool, Row]] { + private var keys: Array[String] = _ + + override def setKeyFields(keys: Array[String]): Unit = { +if (keys == null) { + throw new IllegalArgumentException("keys can't be null!") +} +this.keys = keys + } + + override def setIsAppendOnly(isAppendOnly: JBool): Unit = { +if (isAppendOnly) { + throw new IllegalArgumentException("A QueryableTableSink can not be used with append-only " + +"tables as the table would grow infinitely") +} + } + + override def getRecordType: TypeInformation[Row] = new RowTypeInfo(getFieldTypes, getFieldNames) + + override def emitDataStream(dataStream: DataStream[JTuple2[JBool, Row]]): Unit = { +val keyIndices = keys.map(getFieldNames.indexOf(_)) +val keyTypes = keyIndices.map(getFieldTypes(_)) + +val keySelectorType = new RowTypeInfo(keyTypes, keys) + +val processFunction = new
[jira] [Commented] (FLINK-6968) Store streaming, updating tables with unique key in queryable state
[ https://issues.apache.org/jira/browse/FLINK-6968?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16486828#comment-16486828 ] ASF GitHub Bot commented on FLINK-6968: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/5688#discussion_r190140777 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/QueryableTableSink.scala --- @@ -0,0 +1,223 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.sinks + +import java.lang.{Boolean => JBool} + +import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor} +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.functions.KeySelector +import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2} +import org.apache.flink.api.java.typeutils.{ResultTypeQueryable, RowTypeInfo} +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.datastream.DataStream +import org.apache.flink.streaming.api.functions.KeyedProcessFunction +import org.apache.flink.streaming.api.{TimeCharacteristic, TimeDomain} +import org.apache.flink.table.api.StreamQueryConfig +import org.apache.flink.table.runtime.aggregate.KeyedProcessFunctionWithCleanupState +import org.apache.flink.types.Row +import org.apache.flink.util.Collector + +/** + * A QueryableTableSink stores table in queryable state. + * + * This class stores table in queryable state so that users can access table data without + * dependency on external storage. + * + * Since this is only a kv storage, currently user can only do point query against it. + * + * Example: + * {{{ + * val env = ExecutionEnvironment.getExecutionEnvironment + * val tEnv = TableEnvironment.getTableEnvironment(env) + * + * val table: Table = ... + * + * val queryableTableSink: QueryableTableSink = new QueryableTableSink( + * "prefix", + * queryConfig, + * None) + * + * tEnv.writeToSink(table, queryableTableSink, config) + * }}} + * + * When program starts to run, user can access state with QueryableStateClient. + * {{{ + * val client = new QueryableStateClient(tmHostname, proxyPort) + * val data = client.getKvState( + * jobId, + * "prefix-column1", + * Row.of(1), + * new RowTypeInfo(Array(TypeInfoformation.of(classOf[Int]), Array("id")) + * stateDescriptor) + * .get(); + * + * }}} + * + * + * @param namePrefix + * @param queryConfig + * @param cleanupTimeDomain + */ +class QueryableTableSink( +private val namePrefix: String, +private val queryConfig: StreamQueryConfig, +private val cleanupTimeDomain: Option[TimeDomain]) + extends UpsertStreamTableSink[Row] +with TableSinkBase[JTuple2[JBool, Row]] { + private var keys: Array[String] = _ + + override def setKeyFields(keys: Array[String]): Unit = { +if (keys == null) { + throw new IllegalArgumentException("keys can't be null!") +} +this.keys = keys + } + + override def setIsAppendOnly(isAppendOnly: JBool): Unit = { +if (isAppendOnly) { + throw new IllegalArgumentException("A QueryableTableSink can not be used with append-only " + +"tables as the table would grow infinitely") +} + } + + override def getRecordType: TypeInformation[Row] = new RowTypeInfo(getFieldTypes, getFieldNames) + + override def emitDataStream(dataStream: DataStream[JTuple2[JBool, Row]]): Unit = { +val keyIndices = keys.map(getFieldNames.indexOf(_)) +val keyTypes = keyIndices.map(getFieldTypes(_)) + +val keySelectorType = new RowTypeInfo(keyTypes, keys) + +val processFunction = new
[jira] [Commented] (FLINK-6968) Store streaming, updating tables with unique key in queryable state
[ https://issues.apache.org/jira/browse/FLINK-6968?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16486826#comment-16486826 ] ASF GitHub Bot commented on FLINK-6968: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/5688#discussion_r190140171 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/QueryableTableSink.scala --- @@ -0,0 +1,223 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.sinks + +import java.lang.{Boolean => JBool} + +import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor} +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.functions.KeySelector +import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2} +import org.apache.flink.api.java.typeutils.{ResultTypeQueryable, RowTypeInfo} +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.datastream.DataStream +import org.apache.flink.streaming.api.functions.KeyedProcessFunction +import org.apache.flink.streaming.api.{TimeCharacteristic, TimeDomain} +import org.apache.flink.table.api.StreamQueryConfig +import org.apache.flink.table.runtime.aggregate.KeyedProcessFunctionWithCleanupState +import org.apache.flink.types.Row +import org.apache.flink.util.Collector + +/** + * A QueryableTableSink stores table in queryable state. + * + * This class stores table in queryable state so that users can access table data without + * dependency on external storage. + * + * Since this is only a kv storage, currently user can only do point query against it. + * + * Example: + * {{{ + * val env = ExecutionEnvironment.getExecutionEnvironment + * val tEnv = TableEnvironment.getTableEnvironment(env) + * + * val table: Table = ... + * + * val queryableTableSink: QueryableTableSink = new QueryableTableSink( + * "prefix", + * queryConfig, + * None) + * + * tEnv.writeToSink(table, queryableTableSink, config) + * }}} + * + * When program starts to run, user can access state with QueryableStateClient. + * {{{ + * val client = new QueryableStateClient(tmHostname, proxyPort) + * val data = client.getKvState( + * jobId, + * "prefix-column1", + * Row.of(1), + * new RowTypeInfo(Array(TypeInfoformation.of(classOf[Int]), Array("id")) + * stateDescriptor) + * .get(); + * + * }}} + * + * + * @param namePrefix + * @param queryConfig + * @param cleanupTimeDomain + */ +class QueryableTableSink( +private val namePrefix: String, +private val queryConfig: StreamQueryConfig, +private val cleanupTimeDomain: Option[TimeDomain]) + extends UpsertStreamTableSink[Row] +with TableSinkBase[JTuple2[JBool, Row]] { + private var keys: Array[String] = _ + + override def setKeyFields(keys: Array[String]): Unit = { +if (keys == null) { + throw new IllegalArgumentException("keys can't be null!") +} +this.keys = keys + } + + override def setIsAppendOnly(isAppendOnly: JBool): Unit = { +if (isAppendOnly) { + throw new IllegalArgumentException("A QueryableTableSink can not be used with append-only " + +"tables as the table would grow infinitely") +} + } + + override def getRecordType: TypeInformation[Row] = new RowTypeInfo(getFieldTypes, getFieldNames) + + override def emitDataStream(dataStream: DataStream[JTuple2[JBool, Row]]): Unit = { +val keyIndices = keys.map(getFieldNames.indexOf(_)) +val keyTypes = keyIndices.map(getFieldTypes(_)) + +val keySelectorType = new RowTypeInfo(keyTypes, keys) + +val processFunction = new
[jira] [Commented] (FLINK-6968) Store streaming, updating tables with unique key in queryable state
[ https://issues.apache.org/jira/browse/FLINK-6968?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16486827#comment-16486827 ] ASF GitHub Bot commented on FLINK-6968: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/5688#discussion_r190137656 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/QueryableTableSink.scala --- @@ -0,0 +1,223 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.sinks + +import java.lang.{Boolean => JBool} + +import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor} +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.functions.KeySelector +import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2} +import org.apache.flink.api.java.typeutils.{ResultTypeQueryable, RowTypeInfo} +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.datastream.DataStream +import org.apache.flink.streaming.api.functions.KeyedProcessFunction +import org.apache.flink.streaming.api.{TimeCharacteristic, TimeDomain} +import org.apache.flink.table.api.StreamQueryConfig +import org.apache.flink.table.runtime.aggregate.KeyedProcessFunctionWithCleanupState +import org.apache.flink.types.Row +import org.apache.flink.util.Collector + +/** + * A QueryableTableSink stores table in queryable state. + * + * This class stores table in queryable state so that users can access table data without + * dependency on external storage. + * + * Since this is only a kv storage, currently user can only do point query against it. + * + * Example: + * {{{ + * val env = ExecutionEnvironment.getExecutionEnvironment + * val tEnv = TableEnvironment.getTableEnvironment(env) + * + * val table: Table = ... + * + * val queryableTableSink: QueryableTableSink = new QueryableTableSink( + * "prefix", + * queryConfig, + * None) + * + * tEnv.writeToSink(table, queryableTableSink, config) + * }}} + * + * When program starts to run, user can access state with QueryableStateClient. + * {{{ + * val client = new QueryableStateClient(tmHostname, proxyPort) + * val data = client.getKvState( + * jobId, + * "prefix-column1", + * Row.of(1), + * new RowTypeInfo(Array(TypeInfoformation.of(classOf[Int]), Array("id")) + * stateDescriptor) + * .get(); + * + * }}} + * + * + * @param namePrefix + * @param queryConfig + * @param cleanupTimeDomain + */ +class QueryableTableSink( +private val namePrefix: String, +private val queryConfig: StreamQueryConfig, +private val cleanupTimeDomain: Option[TimeDomain]) + extends UpsertStreamTableSink[Row] +with TableSinkBase[JTuple2[JBool, Row]] { + private var keys: Array[String] = _ + + override def setKeyFields(keys: Array[String]): Unit = { +if (keys == null) { + throw new IllegalArgumentException("keys can't be null!") +} +this.keys = keys + } + + override def setIsAppendOnly(isAppendOnly: JBool): Unit = { +if (isAppendOnly) { + throw new IllegalArgumentException("A QueryableTableSink can not be used with append-only " + +"tables as the table would grow infinitely") +} + } + + override def getRecordType: TypeInformation[Row] = new RowTypeInfo(getFieldTypes, getFieldNames) + + override def emitDataStream(dataStream: DataStream[JTuple2[JBool, Row]]): Unit = { +val keyIndices = keys.map(getFieldNames.indexOf(_)) +val keyTypes = keyIndices.map(getFieldTypes(_)) + +val keySelectorType = new RowTypeInfo(keyTypes, keys) + +val processFunction = new
[jira] [Commented] (FLINK-6968) Store streaming, updating tables with unique key in queryable state
[ https://issues.apache.org/jira/browse/FLINK-6968?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16486822#comment-16486822 ] ASF GitHub Bot commented on FLINK-6968: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/5688#discussion_r190135653 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/QueryableTableSink.scala --- @@ -0,0 +1,223 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.sinks + +import java.lang.{Boolean => JBool} + +import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor} +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.functions.KeySelector +import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2} +import org.apache.flink.api.java.typeutils.{ResultTypeQueryable, RowTypeInfo} +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.datastream.DataStream +import org.apache.flink.streaming.api.functions.KeyedProcessFunction +import org.apache.flink.streaming.api.{TimeCharacteristic, TimeDomain} +import org.apache.flink.table.api.StreamQueryConfig +import org.apache.flink.table.runtime.aggregate.KeyedProcessFunctionWithCleanupState +import org.apache.flink.types.Row +import org.apache.flink.util.Collector + +/** + * A QueryableTableSink stores table in queryable state. --- End diff -- Update the comment. > Store streaming, updating tables with unique key in queryable state > --- > > Key: FLINK-6968 > URL: https://issues.apache.org/jira/browse/FLINK-6968 > Project: Flink > Issue Type: New Feature > Components: Table API SQL >Reporter: Fabian Hueske >Assignee: Renjie Liu >Priority: Major > > Streaming tables with unique key are continuously updated. For example > queries with a non-windowed aggregation generate such tables. Commonly, such > updating tables are emitted via an upsert table sink to an external datastore > (k-v store, database) to make it accessible to applications. > This issue is about adding a feature to store and maintain such a table as > queryable state in Flink. By storing the table in Flnk's queryable state, we > do not need an external data store to access the results of the query but can > query the results directly from Flink. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-6968) Store streaming, updating tables with unique key in queryable state
[ https://issues.apache.org/jira/browse/FLINK-6968?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16486825#comment-16486825 ] ASF GitHub Bot commented on FLINK-6968: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/5688#discussion_r190136036 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/KeyedProcessFunctionWithCleanupState.scala --- @@ -44,8 +45,20 @@ abstract class KeyedProcessFunctionWithCleanupState[K, I, O](queryConfig: Stream protected def registerProcessingCleanupTimer( ctx: KeyedProcessFunction[K, I, O]#Context, currentTime: Long): Unit = { -if (stateCleaningEnabled) { +registerCleanupTimer(ctx, currentTime, TimeDomain.PROCESSING_TIME) + } + protected def registerEventCleanupTimer( --- End diff -- @fhueske is there a reason why we have no event-time clean-up state so far? I think it would make sense to move this change to a separate PR. We should make the notion of time configurable through `StreamQueryConfig`. > Store streaming, updating tables with unique key in queryable state > --- > > Key: FLINK-6968 > URL: https://issues.apache.org/jira/browse/FLINK-6968 > Project: Flink > Issue Type: New Feature > Components: Table API SQL >Reporter: Fabian Hueske >Assignee: Renjie Liu >Priority: Major > > Streaming tables with unique key are continuously updated. For example > queries with a non-windowed aggregation generate such tables. Commonly, such > updating tables are emitted via an upsert table sink to an external datastore > (k-v store, database) to make it accessible to applications. > This issue is about adding a feature to store and maintain such a table as > queryable state in Flink. By storing the table in Flnk's queryable state, we > do not need an external data store to access the results of the query but can > query the results directly from Flink. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-6968) Store streaming, updating tables with unique key in queryable state
[ https://issues.apache.org/jira/browse/FLINK-6968?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16447755#comment-16447755 ] ASF GitHub Bot commented on FLINK-6968: --- Github user liurenjie1024 commented on the issue: https://github.com/apache/flink/pull/5688 @suez1224 Conflict resolved. > Store streaming, updating tables with unique key in queryable state > --- > > Key: FLINK-6968 > URL: https://issues.apache.org/jira/browse/FLINK-6968 > Project: Flink > Issue Type: New Feature > Components: Table API SQL >Reporter: Fabian Hueske >Assignee: Renjie Liu >Priority: Major > > Streaming tables with unique key are continuously updated. For example > queries with a non-windowed aggregation generate such tables. Commonly, such > updating tables are emitted via an upsert table sink to an external datastore > (k-v store, database) to make it accessible to applications. > This issue is about adding a feature to store and maintain such a table as > queryable state in Flink. By storing the table in Flnk's queryable state, we > do not need an external data store to access the results of the query but can > query the results directly from Flink. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-6968) Store streaming, updating tables with unique key in queryable state
[ https://issues.apache.org/jira/browse/FLINK-6968?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16420721#comment-16420721 ] ASF GitHub Bot commented on FLINK-6968: --- Github user suez1224 commented on the issue: https://github.com/apache/flink/pull/5688 Could you please rebase your pr to resolve conflict? Thanks. > Store streaming, updating tables with unique key in queryable state > --- > > Key: FLINK-6968 > URL: https://issues.apache.org/jira/browse/FLINK-6968 > Project: Flink > Issue Type: New Feature > Components: Table API SQL >Reporter: Fabian Hueske >Assignee: Renjie Liu >Priority: Major > > Streaming tables with unique key are continuously updated. For example > queries with a non-windowed aggregation generate such tables. Commonly, such > updating tables are emitted via an upsert table sink to an external datastore > (k-v store, database) to make it accessible to applications. > This issue is about adding a feature to store and maintain such a table as > queryable state in Flink. By storing the table in Flnk's queryable state, we > do not need an external data store to access the results of the query but can > query the results directly from Flink. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-6968) Store streaming, updating tables with unique key in queryable state
[ https://issues.apache.org/jira/browse/FLINK-6968?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16405945#comment-16405945 ] ASF GitHub Bot commented on FLINK-6968: --- Github user liurenjie1024 commented on a diff in the pull request: https://github.com/apache/flink/pull/5688#discussion_r175680499 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/QueryableTableSink.scala --- @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.sinks + +import java.lang.{Boolean => JBool} + +import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor} +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.functions.KeySelector +import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2} +import org.apache.flink.api.java.typeutils.{ResultTypeQueryable, RowTypeInfo} +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.datastream.DataStream +import org.apache.flink.streaming.api.functions.ProcessFunction +import org.apache.flink.table.api.StreamQueryConfig +import org.apache.flink.table.runtime.aggregate.ProcessFunctionWithCleanupState +import org.apache.flink.types.Row +import org.apache.flink.util.Collector +import org.slf4j.LoggerFactory + +class QueryableTableSink( +private val namePrefix: String, +private val queryConfig: StreamQueryConfig) + extends UpsertStreamTableSink[Row] +with TableSinkBase[JTuple2[JBool, Row]] { + private var keys: Array[String] = _ + + override def setKeyFields(keys: Array[String]): Unit = { +if (keys == null) { + throw new IllegalArgumentException("keys can't be null!") +} +this.keys = keys + } + + override def setIsAppendOnly(isAppendOnly: JBool): Unit = { +if (isAppendOnly) { + throw new IllegalArgumentException("A QueryableTableSink can not be used with append-only " + +"tables as the table would grow infinitely") +} + } + + override def getRecordType: TypeInformation[Row] = new RowTypeInfo(getFieldTypes, getFieldNames) + + override def emitDataStream(dataStream: DataStream[JTuple2[JBool, Row]]): Unit = { +val keyIndices = keys.map(getFieldNames.indexOf(_)) +val keyTypes = keyIndices.map(getFieldTypes(_)) + +val keySelectorType = new RowTypeInfo(keyTypes, keys) + +val processFunction = new QueryableStateProcessFunction( + namePrefix, + queryConfig, + keys, + getFieldNames, + getFieldTypes) + +dataStream.keyBy(new RowKeySelector(keyIndices, keySelectorType)) + .process(processFunction) + } + + override protected def copy: TableSinkBase[JTuple2[JBool, Row]] = { +new QueryableTableSink(this.namePrefix, this.queryConfig) + } +} + +class RowKeySelector( + private val keyIndices: Array[Int], + @transient private val returnType: TypeInformation[Row]) + extends KeySelector[JTuple2[JBool, Row], Row] +with ResultTypeQueryable[Row] { + + override def getKey(value: JTuple2[JBool, Row]): Row = { +val keys = keyIndices + +val srcRow = value.f1 + +val destRow = new Row(keys.length) +var i = 0 +while (i < keys.length) { + destRow.setField(i, srcRow.getField(keys(i))) + i += 1 +} + +destRow + } + + override def getProducedType: TypeInformation[Row] = returnType +} + +class QueryableStateProcessFunction( + private val namePrefix: String, + private val queryConfig: StreamQueryConfig, + private val keyNames: Array[String], + private val fieldNames: Array[String], + private val fieldTypes: Array[TypeInformation[_]]) + extends ProcessFunctionWithCleanupState[JTuple2[JBool, Row],
[jira] [Commented] (FLINK-6968) Store streaming, updating tables with unique key in queryable state
[ https://issues.apache.org/jira/browse/FLINK-6968?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16405944#comment-16405944 ] ASF GitHub Bot commented on FLINK-6968: --- Github user liurenjie1024 commented on a diff in the pull request: https://github.com/apache/flink/pull/5688#discussion_r175680339 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/QueryableTableSink.scala --- @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.sinks + +import java.lang.{Boolean => JBool} + +import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor} +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.functions.KeySelector +import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2} +import org.apache.flink.api.java.typeutils.{ResultTypeQueryable, RowTypeInfo} +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.datastream.DataStream +import org.apache.flink.streaming.api.functions.ProcessFunction +import org.apache.flink.table.api.StreamQueryConfig +import org.apache.flink.table.runtime.aggregate.ProcessFunctionWithCleanupState +import org.apache.flink.types.Row +import org.apache.flink.util.Collector +import org.slf4j.LoggerFactory + +class QueryableTableSink( --- End diff -- Java doc added. > Store streaming, updating tables with unique key in queryable state > --- > > Key: FLINK-6968 > URL: https://issues.apache.org/jira/browse/FLINK-6968 > Project: Flink > Issue Type: New Feature > Components: Table API SQL >Reporter: Fabian Hueske >Assignee: Renjie Liu >Priority: Major > > Streaming tables with unique key are continuously updated. For example > queries with a non-windowed aggregation generate such tables. Commonly, such > updating tables are emitted via an upsert table sink to an external datastore > (k-v store, database) to make it accessible to applications. > This issue is about adding a feature to store and maintain such a table as > queryable state in Flink. By storing the table in Flnk's queryable state, we > do not need an external data store to access the results of the query but can > query the results directly from Flink. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-6968) Store streaming, updating tables with unique key in queryable state
[ https://issues.apache.org/jira/browse/FLINK-6968?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16404297#comment-16404297 ] ASF GitHub Bot commented on FLINK-6968: --- Github user walterddr commented on a diff in the pull request: https://github.com/apache/flink/pull/5688#discussion_r175315979 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/QueryableTableSink.scala --- @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.sinks + +import java.lang.{Boolean => JBool} + +import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor} +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.functions.KeySelector +import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2} +import org.apache.flink.api.java.typeutils.{ResultTypeQueryable, RowTypeInfo} +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.datastream.DataStream +import org.apache.flink.streaming.api.functions.ProcessFunction +import org.apache.flink.table.api.StreamQueryConfig +import org.apache.flink.table.runtime.aggregate.ProcessFunctionWithCleanupState +import org.apache.flink.types.Row +import org.apache.flink.util.Collector +import org.slf4j.LoggerFactory + +class QueryableTableSink( --- End diff -- Besides @xccui 's suggestion in adding formal docs. Could you also add some java doc style comment here explaining what this table sink does and how this sink is intended to be used? > Store streaming, updating tables with unique key in queryable state > --- > > Key: FLINK-6968 > URL: https://issues.apache.org/jira/browse/FLINK-6968 > Project: Flink > Issue Type: New Feature > Components: Table API SQL >Reporter: Fabian Hueske >Assignee: Renjie Liu >Priority: Major > > Streaming tables with unique key are continuously updated. For example > queries with a non-windowed aggregation generate such tables. Commonly, such > updating tables are emitted via an upsert table sink to an external datastore > (k-v store, database) to make it accessible to applications. > This issue is about adding a feature to store and maintain such a table as > queryable state in Flink. By storing the table in Flnk's queryable state, we > do not need an external data store to access the results of the query but can > query the results directly from Flink. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-6968) Store streaming, updating tables with unique key in queryable state
[ https://issues.apache.org/jira/browse/FLINK-6968?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16404298#comment-16404298 ] ASF GitHub Bot commented on FLINK-6968: --- Github user walterddr commented on a diff in the pull request: https://github.com/apache/flink/pull/5688#discussion_r175317673 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/QueryableTableSink.scala --- @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.sinks + +import java.lang.{Boolean => JBool} + +import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor} +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.functions.KeySelector +import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2} +import org.apache.flink.api.java.typeutils.{ResultTypeQueryable, RowTypeInfo} +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.datastream.DataStream +import org.apache.flink.streaming.api.functions.ProcessFunction +import org.apache.flink.table.api.StreamQueryConfig +import org.apache.flink.table.runtime.aggregate.ProcessFunctionWithCleanupState +import org.apache.flink.types.Row +import org.apache.flink.util.Collector +import org.slf4j.LoggerFactory + +class QueryableTableSink( +private val namePrefix: String, +private val queryConfig: StreamQueryConfig) + extends UpsertStreamTableSink[Row] +with TableSinkBase[JTuple2[JBool, Row]] { + private var keys: Array[String] = _ + + override def setKeyFields(keys: Array[String]): Unit = { +if (keys == null) { + throw new IllegalArgumentException("keys can't be null!") +} +this.keys = keys + } + + override def setIsAppendOnly(isAppendOnly: JBool): Unit = { +if (isAppendOnly) { + throw new IllegalArgumentException("A QueryableTableSink can not be used with append-only " + +"tables as the table would grow infinitely") +} + } + + override def getRecordType: TypeInformation[Row] = new RowTypeInfo(getFieldTypes, getFieldNames) + + override def emitDataStream(dataStream: DataStream[JTuple2[JBool, Row]]): Unit = { +val keyIndices = keys.map(getFieldNames.indexOf(_)) +val keyTypes = keyIndices.map(getFieldTypes(_)) + +val keySelectorType = new RowTypeInfo(keyTypes, keys) + +val processFunction = new QueryableStateProcessFunction( + namePrefix, + queryConfig, + keys, + getFieldNames, + getFieldTypes) + +dataStream.keyBy(new RowKeySelector(keyIndices, keySelectorType)) + .process(processFunction) + } + + override protected def copy: TableSinkBase[JTuple2[JBool, Row]] = { +new QueryableTableSink(this.namePrefix, this.queryConfig) + } +} + +class RowKeySelector( + private val keyIndices: Array[Int], + @transient private val returnType: TypeInformation[Row]) + extends KeySelector[JTuple2[JBool, Row], Row] +with ResultTypeQueryable[Row] { + + override def getKey(value: JTuple2[JBool, Row]): Row = { +val keys = keyIndices + +val srcRow = value.f1 + +val destRow = new Row(keys.length) +var i = 0 +while (i < keys.length) { + destRow.setField(i, srcRow.getField(keys(i))) + i += 1 +} + +destRow + } + + override def getProducedType: TypeInformation[Row] = returnType +} + +class QueryableStateProcessFunction( + private val namePrefix: String, + private val queryConfig: StreamQueryConfig, + private val keyNames: Array[String], + private val fieldNames: Array[String], + private val fieldTypes: Array[TypeInformation[_]]) + extends ProcessFunctionWithCleanupState[JTuple2[JBool, Row],
[jira] [Commented] (FLINK-6968) Store streaming, updating tables with unique key in queryable state
[ https://issues.apache.org/jira/browse/FLINK-6968?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16401424#comment-16401424 ] ASF GitHub Bot commented on FLINK-6968: --- Github user liurenjie1024 commented on a diff in the pull request: https://github.com/apache/flink/pull/5688#discussion_r174990348 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/QueryableTableSink.scala --- @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.sinks + +import java.lang.{Boolean => JBool} + +import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor} +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.functions.KeySelector +import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2} +import org.apache.flink.api.java.typeutils.{ResultTypeQueryable, RowTypeInfo} +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.datastream.DataStream +import org.apache.flink.streaming.api.functions.ProcessFunction +import org.apache.flink.table.api.StreamQueryConfig +import org.apache.flink.table.runtime.aggregate.ProcessFunctionWithCleanupState +import org.apache.flink.types.Row +import org.apache.flink.util.Collector +import org.slf4j.LoggerFactory + +class QueryableTableSink( + private val namePrefix: String, --- End diff -- Fixed. > Store streaming, updating tables with unique key in queryable state > --- > > Key: FLINK-6968 > URL: https://issues.apache.org/jira/browse/FLINK-6968 > Project: Flink > Issue Type: New Feature > Components: Table API SQL >Reporter: Fabian Hueske >Assignee: Renjie Liu >Priority: Major > > Streaming tables with unique key are continuously updated. For example > queries with a non-windowed aggregation generate such tables. Commonly, such > updating tables are emitted via an upsert table sink to an external datastore > (k-v store, database) to make it accessible to applications. > This issue is about adding a feature to store and maintain such a table as > queryable state in Flink. By storing the table in Flnk's queryable state, we > do not need an external data store to access the results of the query but can > query the results directly from Flink. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-6968) Store streaming, updating tables with unique key in queryable state
[ https://issues.apache.org/jira/browse/FLINK-6968?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16398795#comment-16398795 ] ASF GitHub Bot commented on FLINK-6968: --- Github user xccui commented on a diff in the pull request: https://github.com/apache/flink/pull/5688#discussion_r174511059 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/QueryableTableSink.scala --- @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.sinks + +import java.lang.{Boolean => JBool} + +import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor} +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.functions.KeySelector +import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2} +import org.apache.flink.api.java.typeutils.{ResultTypeQueryable, RowTypeInfo} +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.datastream.DataStream +import org.apache.flink.streaming.api.functions.ProcessFunction +import org.apache.flink.table.api.StreamQueryConfig +import org.apache.flink.table.runtime.aggregate.ProcessFunctionWithCleanupState +import org.apache.flink.types.Row +import org.apache.flink.util.Collector +import org.slf4j.LoggerFactory + +class QueryableTableSink( + private val namePrefix: String, --- End diff -- Four spaces indent for parameter declaring (which is also applicable for methods). > Store streaming, updating tables with unique key in queryable state > --- > > Key: FLINK-6968 > URL: https://issues.apache.org/jira/browse/FLINK-6968 > Project: Flink > Issue Type: New Feature > Components: Table API SQL >Reporter: Fabian Hueske >Assignee: Renjie Liu >Priority: Major > > Streaming tables with unique key are continuously updated. For example > queries with a non-windowed aggregation generate such tables. Commonly, such > updating tables are emitted via an upsert table sink to an external datastore > (k-v store, database) to make it accessible to applications. > This issue is about adding a feature to store and maintain such a table as > queryable state in Flink. By storing the table in Flnk's queryable state, we > do not need an external data store to access the results of the query but can > query the results directly from Flink. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-6968) Store streaming, updating tables with unique key in queryable state
[ https://issues.apache.org/jira/browse/FLINK-6968?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16398173#comment-16398173 ] ASF GitHub Bot commented on FLINK-6968: --- Github user liurenjie1024 commented on the issue: https://github.com/apache/flink/pull/5688 @xccui Thanks for you suggestions. 1. Add an it test for this is difficult. Think about it, I need to ensure that elements to be processed while keeping the job running, and it's difficult to achieve this. But I will do some manual test for that. 2. I'll add doc for that. 3. I'll squash the commits when review is done. > Store streaming, updating tables with unique key in queryable state > --- > > Key: FLINK-6968 > URL: https://issues.apache.org/jira/browse/FLINK-6968 > Project: Flink > Issue Type: New Feature > Components: Table API SQL >Reporter: Fabian Hueske >Assignee: Renjie Liu >Priority: Major > > Streaming tables with unique key are continuously updated. For example > queries with a non-windowed aggregation generate such tables. Commonly, such > updating tables are emitted via an upsert table sink to an external datastore > (k-v store, database) to make it accessible to applications. > This issue is about adding a feature to store and maintain such a table as > queryable state in Flink. By storing the table in Flnk's queryable state, we > do not need an external data store to access the results of the query but can > query the results directly from Flink. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-6968) Store streaming, updating tables with unique key in queryable state
[ https://issues.apache.org/jira/browse/FLINK-6968?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16398169#comment-16398169 ] ASF GitHub Bot commented on FLINK-6968: --- Github user liurenjie1024 commented on a diff in the pull request: https://github.com/apache/flink/pull/5688#discussion_r174366980 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/QueryableTableSinkTest.scala --- @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.stream.table + +import java.lang.{Boolean => JBool} + +import org.apache.flink.api.common.state.ValueStateDescriptor +import org.apache.flink.api.common.time.Time +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2} +import org.apache.flink.api.java.typeutils.RowTypeInfo +import org.apache.flink.streaming.api.operators.LegacyKeyedProcessOperator +import org.apache.flink.table.api.StreamQueryConfig +import org.apache.flink.table.runtime.harness.HarnessTestBase +import org.apache.flink.table.sinks.{QueryableStateProcessFunction, RowKeySelector} +import org.apache.flink.types.Row +import org.junit.Assert.assertEquals +import org.junit.Test + +class QueryableTableSinkTest extends HarnessTestBase { + @Test + def testRowSelector(): Unit = { +val keyTypes = Array(TypeInformation.of(classOf[List[Int]]), TypeInformation.of(classOf[String])) +val selector = new RowKeySelector(Array(0, 2), new RowTypeInfo(keyTypes:_*)) + +val src = Row.of(List(1), "a", "b") +val key = selector.getKey(JTuple2.of(true, src)) + +assertEquals(Row.of(List(1), "b"), key) + } + + @Test + def testProcessFunction(): Unit = { +val queryConfig = new StreamQueryConfig() + .withIdleStateRetentionTime(Time.milliseconds(2), Time.milliseconds(10)) + +val keys = Array("id") +val keyType = new RowTypeInfo(TypeInformation.of(classOf[String])) +val fieldNames = Array("id", "is_manager", "name") +val fieldTypes: Array[TypeInformation[_]] = Array( + TypeInformation.of(classOf[String]).asInstanceOf[TypeInformation[_]], + TypeInformation.of(classOf[JBool]).asInstanceOf[TypeInformation[_]], + TypeInformation.of(classOf[String]).asInstanceOf[TypeInformation[_]]) +val func = new QueryableStateProcessFunction("test", queryConfig, keys, fieldNames, fieldTypes) + +val operator = new LegacyKeyedProcessOperator[Row, JTuple2[JBool, Row], Void](func) --- End diff -- Same as above. > Store streaming, updating tables with unique key in queryable state > --- > > Key: FLINK-6968 > URL: https://issues.apache.org/jira/browse/FLINK-6968 > Project: Flink > Issue Type: New Feature > Components: Table API SQL >Reporter: Fabian Hueske >Assignee: Renjie Liu >Priority: Major > > Streaming tables with unique key are continuously updated. For example > queries with a non-windowed aggregation generate such tables. Commonly, such > updating tables are emitted via an upsert table sink to an external datastore > (k-v store, database) to make it accessible to applications. > This issue is about adding a feature to store and maintain such a table as > queryable state in Flink. By storing the table in Flnk's queryable state, we > do not need an external data store to access the results of the query but can > query the results directly from Flink. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-6968) Store streaming, updating tables with unique key in queryable state
[ https://issues.apache.org/jira/browse/FLINK-6968?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16398166#comment-16398166 ] ASF GitHub Bot commented on FLINK-6968: --- Github user liurenjie1024 commented on a diff in the pull request: https://github.com/apache/flink/pull/5688#discussion_r174366242 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/QueryableTableSink.scala --- @@ -0,0 +1,175 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.sinks + +import java.lang.{Boolean => JBool} + +import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor} +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.functions.KeySelector +import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2} +import org.apache.flink.api.java.typeutils.{ResultTypeQueryable, RowTypeInfo} +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.datastream.DataStream +import org.apache.flink.streaming.api.functions.ProcessFunction +import org.apache.flink.table.api.StreamQueryConfig +import org.apache.flink.table.runtime.aggregate.ProcessFunctionWithCleanupState +import org.apache.flink.types.Row +import org.apache.flink.util.Collector +import org.slf4j.LoggerFactory + +class QueryableTableSink(private val namePrefix: String, + private val queryConfig: StreamQueryConfig) + extends UpsertStreamTableSink[Row] + with TableSinkBase[JTuple2[JBool, Row]] { + private var keys: Array[String] = _ + + /** +* Configures the unique key fields of the [[Table]] to write. +* The method is called after [[TableSink.configure()]]. +* +* The keys array might be empty, if the table consists of a single (updated) record. +* If the table does not have a key and is append-only, the keys attribute is null. +* +* @param keys the field names of the table's keys, an empty array if the table has a single +* row, and null if the table is append-only and has no key. +*/ + override def setKeyFields(keys: Array[String]): Unit = { +if (keys == null) { + throw new IllegalArgumentException("keys can't be null!") +} +this.keys = keys + } + + /** +* Specifies whether the [[Table]] to write is append-only or not. +* +* @param isAppendOnly true if the table is append-only, false otherwise. +*/ + override def setIsAppendOnly(isAppendOnly: JBool): Unit = { +if (isAppendOnly) { + throw new IllegalArgumentException("A QueryableTableSink can not be used with append-only tables " + +"as the table would grow infinitely") +} + } + + /** Returns the requested record type */ + override def getRecordType: TypeInformation[Row] = new RowTypeInfo(getFieldTypes, getFieldNames) + + /** Emits the DataStream. */ + override def emitDataStream(dataStream: DataStream[JTuple2[JBool, Row]]): Unit = { +val keyIndices = keys.map(getFieldNames.indexOf(_)) +val keyTypes = keyIndices.map(getFieldTypes(_)) + +val keySelectorType = new RowTypeInfo(keyTypes, keys) + +val processFunction = new QueryableStateProcessFunction( + namePrefix, + queryConfig, + keys, + getFieldNames, + getFieldTypes) + +dataStream.keyBy(new RowKeySelector(keyIndices, keySelectorType)) + .process(processFunction) --- End diff -- I'm waiting another [PR](https://github.com/apache/flink/pull/5680) to be merged. > Store streaming, updating tables with unique key in queryable state > --- > > Key: FLINK-6968 > URL: https://issues.apache.org/jira/browse/FLINK-6968 >
[jira] [Commented] (FLINK-6968) Store streaming, updating tables with unique key in queryable state
[ https://issues.apache.org/jira/browse/FLINK-6968?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16398018#comment-16398018 ] ASF GitHub Bot commented on FLINK-6968: --- Github user xccui commented on a diff in the pull request: https://github.com/apache/flink/pull/5688#discussion_r174338993 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/QueryableTableSink.scala --- @@ -0,0 +1,175 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.sinks + +import java.lang.{Boolean => JBool} + +import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor} +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.functions.KeySelector +import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2} +import org.apache.flink.api.java.typeutils.{ResultTypeQueryable, RowTypeInfo} +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.datastream.DataStream +import org.apache.flink.streaming.api.functions.ProcessFunction +import org.apache.flink.table.api.StreamQueryConfig +import org.apache.flink.table.runtime.aggregate.ProcessFunctionWithCleanupState +import org.apache.flink.types.Row +import org.apache.flink.util.Collector +import org.slf4j.LoggerFactory --- End diff -- Remove unused imports. > Store streaming, updating tables with unique key in queryable state > --- > > Key: FLINK-6968 > URL: https://issues.apache.org/jira/browse/FLINK-6968 > Project: Flink > Issue Type: New Feature > Components: Table API SQL >Reporter: Fabian Hueske >Assignee: Renjie Liu >Priority: Major > > Streaming tables with unique key are continuously updated. For example > queries with a non-windowed aggregation generate such tables. Commonly, such > updating tables are emitted via an upsert table sink to an external datastore > (k-v store, database) to make it accessible to applications. > This issue is about adding a feature to store and maintain such a table as > queryable state in Flink. By storing the table in Flnk's queryable state, we > do not need an external data store to access the results of the query but can > query the results directly from Flink. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-6968) Store streaming, updating tables with unique key in queryable state
[ https://issues.apache.org/jira/browse/FLINK-6968?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16398015#comment-16398015 ] ASF GitHub Bot commented on FLINK-6968: --- Github user xccui commented on a diff in the pull request: https://github.com/apache/flink/pull/5688#discussion_r174338968 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/QueryableTableSink.scala --- @@ -0,0 +1,175 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.sinks + +import java.lang.{Boolean => JBool} + +import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor} +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.functions.KeySelector +import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2} +import org.apache.flink.api.java.typeutils.{ResultTypeQueryable, RowTypeInfo} +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.datastream.DataStream +import org.apache.flink.streaming.api.functions.ProcessFunction +import org.apache.flink.table.api.StreamQueryConfig +import org.apache.flink.table.runtime.aggregate.ProcessFunctionWithCleanupState +import org.apache.flink.types.Row +import org.apache.flink.util.Collector +import org.slf4j.LoggerFactory + +class QueryableTableSink(private val namePrefix: String, + private val queryConfig: StreamQueryConfig) + extends UpsertStreamTableSink[Row] + with TableSinkBase[JTuple2[JBool, Row]] { + private var keys: Array[String] = _ + + /** +* Configures the unique key fields of the [[Table]] to write. --- End diff -- Use the full class name for `[[Table]]` since it's not imported. > Store streaming, updating tables with unique key in queryable state > --- > > Key: FLINK-6968 > URL: https://issues.apache.org/jira/browse/FLINK-6968 > Project: Flink > Issue Type: New Feature > Components: Table API SQL >Reporter: Fabian Hueske >Assignee: Renjie Liu >Priority: Major > > Streaming tables with unique key are continuously updated. For example > queries with a non-windowed aggregation generate such tables. Commonly, such > updating tables are emitted via an upsert table sink to an external datastore > (k-v store, database) to make it accessible to applications. > This issue is about adding a feature to store and maintain such a table as > queryable state in Flink. By storing the table in Flnk's queryable state, we > do not need an external data store to access the results of the query but can > query the results directly from Flink. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-6968) Store streaming, updating tables with unique key in queryable state
[ https://issues.apache.org/jira/browse/FLINK-6968?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16398020#comment-16398020 ] ASF GitHub Bot commented on FLINK-6968: --- Github user xccui commented on a diff in the pull request: https://github.com/apache/flink/pull/5688#discussion_r174339810 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/QueryableTableSinkTest.scala --- @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.stream.table + +import java.lang.{Boolean => JBool} + +import org.apache.flink.api.common.state.ValueStateDescriptor +import org.apache.flink.api.common.time.Time +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2} +import org.apache.flink.api.java.typeutils.RowTypeInfo +import org.apache.flink.streaming.api.operators.LegacyKeyedProcessOperator +import org.apache.flink.table.api.StreamQueryConfig +import org.apache.flink.table.runtime.harness.HarnessTestBase +import org.apache.flink.table.sinks.{QueryableStateProcessFunction, RowKeySelector} +import org.apache.flink.types.Row +import org.junit.Assert.assertEquals +import org.junit.Test + +class QueryableTableSinkTest extends HarnessTestBase { + @Test + def testRowSelector(): Unit = { +val keyTypes = Array(TypeInformation.of(classOf[List[Int]]), TypeInformation.of(classOf[String])) +val selector = new RowKeySelector(Array(0, 2), new RowTypeInfo(keyTypes:_*)) + +val src = Row.of(List(1), "a", "b") +val key = selector.getKey(JTuple2.of(true, src)) + +assertEquals(Row.of(List(1), "b"), key) + } + + @Test + def testProcessFunction(): Unit = { +val queryConfig = new StreamQueryConfig() + .withIdleStateRetentionTime(Time.milliseconds(2), Time.milliseconds(10)) + +val keys = Array("id") +val keyType = new RowTypeInfo(TypeInformation.of(classOf[String])) +val fieldNames = Array("id", "is_manager", "name") +val fieldTypes: Array[TypeInformation[_]] = Array( + TypeInformation.of(classOf[String]).asInstanceOf[TypeInformation[_]], + TypeInformation.of(classOf[JBool]).asInstanceOf[TypeInformation[_]], + TypeInformation.of(classOf[String]).asInstanceOf[TypeInformation[_]]) +val func = new QueryableStateProcessFunction("test", queryConfig, keys, fieldNames, fieldTypes) + +val operator = new LegacyKeyedProcessOperator[Row, JTuple2[JBool, Row], Void](func) + +val testHarness = createHarnessTester(operator, + new RowKeySelector(Array(0), keyType), + keyType) + +testHarness.open() + + --- End diff -- Remove extra blank lines. > Store streaming, updating tables with unique key in queryable state > --- > > Key: FLINK-6968 > URL: https://issues.apache.org/jira/browse/FLINK-6968 > Project: Flink > Issue Type: New Feature > Components: Table API SQL >Reporter: Fabian Hueske >Assignee: Renjie Liu >Priority: Major > > Streaming tables with unique key are continuously updated. For example > queries with a non-windowed aggregation generate such tables. Commonly, such > updating tables are emitted via an upsert table sink to an external datastore > (k-v store, database) to make it accessible to applications. > This issue is about adding a feature to store and maintain such a table as > queryable state in Flink. By storing the table in Flnk's queryable state, we > do not need an external data store to access the results of the query but can > query the results directly from Flink. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-6968) Store streaming, updating tables with unique key in queryable state
[ https://issues.apache.org/jira/browse/FLINK-6968?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16398017#comment-16398017 ] ASF GitHub Bot commented on FLINK-6968: --- Github user xccui commented on a diff in the pull request: https://github.com/apache/flink/pull/5688#discussion_r174338981 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/QueryableTableSink.scala --- @@ -0,0 +1,175 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.sinks + +import java.lang.{Boolean => JBool} + +import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor} +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.functions.KeySelector +import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2} +import org.apache.flink.api.java.typeutils.{ResultTypeQueryable, RowTypeInfo} +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.datastream.DataStream +import org.apache.flink.streaming.api.functions.ProcessFunction +import org.apache.flink.table.api.StreamQueryConfig +import org.apache.flink.table.runtime.aggregate.ProcessFunctionWithCleanupState +import org.apache.flink.types.Row +import org.apache.flink.util.Collector +import org.slf4j.LoggerFactory + +class QueryableTableSink(private val namePrefix: String, + private val queryConfig: StreamQueryConfig) --- End diff -- Format the code like that. ``` class QueryableTableSink( private val namePrefix: String, private val queryConfig: StreamQueryConfig) extends UpsertStreamTableSink[Row] ... ``` > Store streaming, updating tables with unique key in queryable state > --- > > Key: FLINK-6968 > URL: https://issues.apache.org/jira/browse/FLINK-6968 > Project: Flink > Issue Type: New Feature > Components: Table API SQL >Reporter: Fabian Hueske >Assignee: Renjie Liu >Priority: Major > > Streaming tables with unique key are continuously updated. For example > queries with a non-windowed aggregation generate such tables. Commonly, such > updating tables are emitted via an upsert table sink to an external datastore > (k-v store, database) to make it accessible to applications. > This issue is about adding a feature to store and maintain such a table as > queryable state in Flink. By storing the table in Flnk's queryable state, we > do not need an external data store to access the results of the query but can > query the results directly from Flink. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-6968) Store streaming, updating tables with unique key in queryable state
[ https://issues.apache.org/jira/browse/FLINK-6968?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16398016#comment-16398016 ] ASF GitHub Bot commented on FLINK-6968: --- Github user xccui commented on a diff in the pull request: https://github.com/apache/flink/pull/5688#discussion_r174339658 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/QueryableTableSinkTest.scala --- @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.stream.table + +import java.lang.{Boolean => JBool} + +import org.apache.flink.api.common.state.ValueStateDescriptor +import org.apache.flink.api.common.time.Time +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2} +import org.apache.flink.api.java.typeutils.RowTypeInfo +import org.apache.flink.streaming.api.operators.LegacyKeyedProcessOperator +import org.apache.flink.table.api.StreamQueryConfig +import org.apache.flink.table.runtime.harness.HarnessTestBase +import org.apache.flink.table.sinks.{QueryableStateProcessFunction, RowKeySelector} +import org.apache.flink.types.Row +import org.junit.Assert.assertEquals +import org.junit.Test + +class QueryableTableSinkTest extends HarnessTestBase { + @Test + def testRowSelector(): Unit = { +val keyTypes = Array(TypeInformation.of(classOf[List[Int]]), TypeInformation.of(classOf[String])) +val selector = new RowKeySelector(Array(0, 2), new RowTypeInfo(keyTypes:_*)) + +val src = Row.of(List(1), "a", "b") +val key = selector.getKey(JTuple2.of(true, src)) + +assertEquals(Row.of(List(1), "b"), key) + } + + @Test + def testProcessFunction(): Unit = { +val queryConfig = new StreamQueryConfig() + .withIdleStateRetentionTime(Time.milliseconds(2), Time.milliseconds(10)) + +val keys = Array("id") +val keyType = new RowTypeInfo(TypeInformation.of(classOf[String])) +val fieldNames = Array("id", "is_manager", "name") +val fieldTypes: Array[TypeInformation[_]] = Array( + TypeInformation.of(classOf[String]).asInstanceOf[TypeInformation[_]], + TypeInformation.of(classOf[JBool]).asInstanceOf[TypeInformation[_]], + TypeInformation.of(classOf[String]).asInstanceOf[TypeInformation[_]]) +val func = new QueryableStateProcessFunction("test", queryConfig, keys, fieldNames, fieldTypes) + +val operator = new LegacyKeyedProcessOperator[Row, JTuple2[JBool, Row], Void](func) --- End diff -- Try to avoid using deprecated classes/methods. > Store streaming, updating tables with unique key in queryable state > --- > > Key: FLINK-6968 > URL: https://issues.apache.org/jira/browse/FLINK-6968 > Project: Flink > Issue Type: New Feature > Components: Table API SQL >Reporter: Fabian Hueske >Assignee: Renjie Liu >Priority: Major > > Streaming tables with unique key are continuously updated. For example > queries with a non-windowed aggregation generate such tables. Commonly, such > updating tables are emitted via an upsert table sink to an external datastore > (k-v store, database) to make it accessible to applications. > This issue is about adding a feature to store and maintain such a table as > queryable state in Flink. By storing the table in Flnk's queryable state, we > do not need an external data store to access the results of the query but can > query the results directly from Flink. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-6968) Store streaming, updating tables with unique key in queryable state
[ https://issues.apache.org/jira/browse/FLINK-6968?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16398019#comment-16398019 ] ASF GitHub Bot commented on FLINK-6968: --- Github user xccui commented on a diff in the pull request: https://github.com/apache/flink/pull/5688#discussion_r174339296 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/QueryableTableSink.scala --- @@ -0,0 +1,175 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.sinks + +import java.lang.{Boolean => JBool} + +import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor} +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.functions.KeySelector +import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2} +import org.apache.flink.api.java.typeutils.{ResultTypeQueryable, RowTypeInfo} +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.datastream.DataStream +import org.apache.flink.streaming.api.functions.ProcessFunction +import org.apache.flink.table.api.StreamQueryConfig +import org.apache.flink.table.runtime.aggregate.ProcessFunctionWithCleanupState +import org.apache.flink.types.Row +import org.apache.flink.util.Collector +import org.slf4j.LoggerFactory + +class QueryableTableSink(private val namePrefix: String, + private val queryConfig: StreamQueryConfig) + extends UpsertStreamTableSink[Row] + with TableSinkBase[JTuple2[JBool, Row]] { + private var keys: Array[String] = _ + + /** +* Configures the unique key fields of the [[Table]] to write. +* The method is called after [[TableSink.configure()]]. +* +* The keys array might be empty, if the table consists of a single (updated) record. +* If the table does not have a key and is append-only, the keys attribute is null. +* +* @param keys the field names of the table's keys, an empty array if the table has a single +* row, and null if the table is append-only and has no key. +*/ + override def setKeyFields(keys: Array[String]): Unit = { +if (keys == null) { + throw new IllegalArgumentException("keys can't be null!") +} +this.keys = keys + } + + /** +* Specifies whether the [[Table]] to write is append-only or not. +* +* @param isAppendOnly true if the table is append-only, false otherwise. +*/ + override def setIsAppendOnly(isAppendOnly: JBool): Unit = { +if (isAppendOnly) { + throw new IllegalArgumentException("A QueryableTableSink can not be used with append-only tables " + +"as the table would grow infinitely") +} + } + + /** Returns the requested record type */ + override def getRecordType: TypeInformation[Row] = new RowTypeInfo(getFieldTypes, getFieldNames) + + /** Emits the DataStream. */ + override def emitDataStream(dataStream: DataStream[JTuple2[JBool, Row]]): Unit = { +val keyIndices = keys.map(getFieldNames.indexOf(_)) +val keyTypes = keyIndices.map(getFieldTypes(_)) + +val keySelectorType = new RowTypeInfo(keyTypes, keys) + +val processFunction = new QueryableStateProcessFunction( + namePrefix, + queryConfig, + keys, + getFieldNames, + getFieldTypes) + +dataStream.keyBy(new RowKeySelector(keyIndices, keySelectorType)) + .process(processFunction) + } + + /** Return a deep copy of the [[TableSink]]. */ + override protected def copy: TableSinkBase[JTuple2[JBool, Row]] = { --- End diff -- The docs for overridden methods could be omitted. > Store streaming, updating tables with unique key in queryable state > --- > >
[jira] [Commented] (FLINK-6968) Store streaming, updating tables with unique key in queryable state
[ https://issues.apache.org/jira/browse/FLINK-6968?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16398001#comment-16398001 ] ASF GitHub Bot commented on FLINK-6968: --- Github user xccui commented on a diff in the pull request: https://github.com/apache/flink/pull/5688#discussion_r174338955 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/QueryableTableSink.scala --- @@ -0,0 +1,175 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.sinks + +import java.lang.{Boolean => JBool} + +import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor} +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.functions.KeySelector +import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2} +import org.apache.flink.api.java.typeutils.{ResultTypeQueryable, RowTypeInfo} +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.datastream.DataStream +import org.apache.flink.streaming.api.functions.ProcessFunction +import org.apache.flink.table.api.StreamQueryConfig +import org.apache.flink.table.runtime.aggregate.ProcessFunctionWithCleanupState +import org.apache.flink.types.Row +import org.apache.flink.util.Collector +import org.slf4j.LoggerFactory + +class QueryableTableSink(private val namePrefix: String, + private val queryConfig: StreamQueryConfig) + extends UpsertStreamTableSink[Row] + with TableSinkBase[JTuple2[JBool, Row]] { + private var keys: Array[String] = _ + + /** +* Configures the unique key fields of the [[Table]] to write. +* The method is called after [[TableSink.configure()]]. +* +* The keys array might be empty, if the table consists of a single (updated) record. +* If the table does not have a key and is append-only, the keys attribute is null. +* +* @param keys the field names of the table's keys, an empty array if the table has a single +* row, and null if the table is append-only and has no key. +*/ + override def setKeyFields(keys: Array[String]): Unit = { +if (keys == null) { + throw new IllegalArgumentException("keys can't be null!") +} +this.keys = keys + } + + /** +* Specifies whether the [[Table]] to write is append-only or not. +* +* @param isAppendOnly true if the table is append-only, false otherwise. +*/ + override def setIsAppendOnly(isAppendOnly: JBool): Unit = { +if (isAppendOnly) { + throw new IllegalArgumentException("A QueryableTableSink can not be used with append-only tables " + --- End diff -- This line is too long (should be less than 100 characters). > Store streaming, updating tables with unique key in queryable state > --- > > Key: FLINK-6968 > URL: https://issues.apache.org/jira/browse/FLINK-6968 > Project: Flink > Issue Type: New Feature > Components: Table API SQL >Reporter: Fabian Hueske >Assignee: Renjie Liu >Priority: Major > > Streaming tables with unique key are continuously updated. For example > queries with a non-windowed aggregation generate such tables. Commonly, such > updating tables are emitted via an upsert table sink to an external datastore > (k-v store, database) to make it accessible to applications. > This issue is about adding a feature to store and maintain such a table as > queryable state in Flink. By storing the table in Flnk's queryable state, we > do not need an external data store to access the results of the query but can > query the results directly from Flink. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-6968) Store streaming, updating tables with unique key in queryable state
[ https://issues.apache.org/jira/browse/FLINK-6968?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16398000#comment-16398000 ] ASF GitHub Bot commented on FLINK-6968: --- Github user xccui commented on a diff in the pull request: https://github.com/apache/flink/pull/5688#discussion_r174338947 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/QueryableTableSink.scala --- @@ -0,0 +1,175 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.sinks + +import java.lang.{Boolean => JBool} + +import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor} +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.functions.KeySelector +import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2} +import org.apache.flink.api.java.typeutils.{ResultTypeQueryable, RowTypeInfo} +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.datastream.DataStream +import org.apache.flink.streaming.api.functions.ProcessFunction +import org.apache.flink.table.api.StreamQueryConfig +import org.apache.flink.table.runtime.aggregate.ProcessFunctionWithCleanupState +import org.apache.flink.types.Row +import org.apache.flink.util.Collector +import org.slf4j.LoggerFactory + +class QueryableTableSink(private val namePrefix: String, + private val queryConfig: StreamQueryConfig) + extends UpsertStreamTableSink[Row] + with TableSinkBase[JTuple2[JBool, Row]] { + private var keys: Array[String] = _ + + /** +* Configures the unique key fields of the [[Table]] to write. +* The method is called after [[TableSink.configure()]]. +* +* The keys array might be empty, if the table consists of a single (updated) record. +* If the table does not have a key and is append-only, the keys attribute is null. +* +* @param keys the field names of the table's keys, an empty array if the table has a single +* row, and null if the table is append-only and has no key. +*/ + override def setKeyFields(keys: Array[String]): Unit = { +if (keys == null) { + throw new IllegalArgumentException("keys can't be null!") +} +this.keys = keys + } + + /** +* Specifies whether the [[Table]] to write is append-only or not. +* +* @param isAppendOnly true if the table is append-only, false otherwise. +*/ + override def setIsAppendOnly(isAppendOnly: JBool): Unit = { +if (isAppendOnly) { + throw new IllegalArgumentException("A QueryableTableSink can not be used with append-only tables " + +"as the table would grow infinitely") +} + } + + /** Returns the requested record type */ + override def getRecordType: TypeInformation[Row] = new RowTypeInfo(getFieldTypes, getFieldNames) + + /** Emits the DataStream. */ + override def emitDataStream(dataStream: DataStream[JTuple2[JBool, Row]]): Unit = { +val keyIndices = keys.map(getFieldNames.indexOf(_)) +val keyTypes = keyIndices.map(getFieldTypes(_)) + +val keySelectorType = new RowTypeInfo(keyTypes, keys) + +val processFunction = new QueryableStateProcessFunction( + namePrefix, + queryConfig, + keys, + getFieldNames, + getFieldTypes) + +dataStream.keyBy(new RowKeySelector(keyIndices, keySelectorType)) + .process(processFunction) --- End diff -- This `process(processFunction)` method has been deprecated. Replace it with `process(KeyedProcessFunction)`. > Store streaming, updating tables with unique key in queryable state > --- > > Key: FLINK-6968 > URL:
[jira] [Commented] (FLINK-6968) Store streaming, updating tables with unique key in queryable state
[ https://issues.apache.org/jira/browse/FLINK-6968?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16397984#comment-16397984 ] ASF GitHub Bot commented on FLINK-6968: --- Github user liurenjie1024 commented on the issue: https://github.com/apache/flink/pull/5688 @twalthr Please help to review this. > Store streaming, updating tables with unique key in queryable state > --- > > Key: FLINK-6968 > URL: https://issues.apache.org/jira/browse/FLINK-6968 > Project: Flink > Issue Type: New Feature > Components: Table API SQL >Reporter: Fabian Hueske >Assignee: Renjie Liu >Priority: Major > > Streaming tables with unique key are continuously updated. For example > queries with a non-windowed aggregation generate such tables. Commonly, such > updating tables are emitted via an upsert table sink to an external datastore > (k-v store, database) to make it accessible to applications. > This issue is about adding a feature to store and maintain such a table as > queryable state in Flink. By storing the table in Flnk's queryable state, we > do not need an external data store to access the results of the query but can > query the results directly from Flink. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-6968) Store streaming, updating tables with unique key in queryable state
[ https://issues.apache.org/jira/browse/FLINK-6968?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16396822#comment-16396822 ] ASF GitHub Bot commented on FLINK-6968: --- GitHub user liurenjie1024 opened a pull request: https://github.com/apache/flink/pull/5688 [FLINK-6968][Table API & SQL] Add Queryable table sink. ## What is the purpose of the change Streaming tables with unique key are continuously updated. For example queries with a non-windowed aggregation generate such tables. Commonly, such updating tables are emitted via an upsert table sink to an external datastore (k-v store, database) to make it accessible to applications. ## Brief change log - *Add a QueryableStateTableSink.* - *States are queryable.* ## Verifying this change This change added tests and can be verified as follows: - *Added test that validates that states will be stored.* - *Manually verified the change by running a 4 node cluser with 2 JobManagers and 4 TaskManagers, a stateful streaming program, and use QueryableStateClient to test that.* ## Documentation - Does this pull request introduce a new feature? yes - If yes, how is the feature documented? docs You can merge this pull request into a Git repository by running: $ git pull https://github.com/liurenjie1024/flink QueryableTableSink Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5688.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5688 commit 0663e550216dfef8bf205f90d5ac8a0e7e77a42b Author: liurenjie1024Date: 2018-03-12T07:32:44Z Code complete commit 6d62d53f0bae65249ab69bddf7932e62ae1e7897 Author: liurenjie1024 Date: 2018-03-13T09:44:30Z Add test commit c9ffa6ecdd638a497b60f3f063b2d352b1b98059 Author: liurenjie1024 Date: 2018-03-13T10:43:19Z Fix test style > Store streaming, updating tables with unique key in queryable state > --- > > Key: FLINK-6968 > URL: https://issues.apache.org/jira/browse/FLINK-6968 > Project: Flink > Issue Type: New Feature > Components: Table API SQL >Reporter: Fabian Hueske >Assignee: Renjie Liu >Priority: Major > > Streaming tables with unique key are continuously updated. For example > queries with a non-windowed aggregation generate such tables. Commonly, such > updating tables are emitted via an upsert table sink to an external datastore > (k-v store, database) to make it accessible to applications. > This issue is about adding a feature to store and maintain such a table as > queryable state in Flink. By storing the table in Flnk's queryable state, we > do not need an external data store to access the results of the query but can > query the results directly from Flink. -- This message was sent by Atlassian JIRA (v7.6.3#76005)