[jira] [Commented] (FLINK-6968) Store streaming, updating tables with unique key in queryable state

2018-08-15 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-6968?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16581093#comment-16581093
 ] 

ASF GitHub Bot commented on FLINK-6968:
---

alpinegizmo commented on issue #6434: [FLINK-6968] [table] Add Queryable table 
sink.
URL: https://github.com/apache/flink/pull/6434#issuecomment-413203702
 
 
   The documentation is pretty thin. It would be nice to explain that the 
queryable table sink can only be used in retract mode, and include a better 
example. I only understood what's going on after reading the test -- I found 
Row.of("jeff") much easier to understand than Row.of(1).


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Store streaming, updating tables with unique key in queryable state
> ---
>
> Key: FLINK-6968
> URL: https://issues.apache.org/jira/browse/FLINK-6968
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API  SQL
>Reporter: Fabian Hueske
>Assignee: Renjie Liu
>Priority: Major
>  Labels: pull-request-available
>
> Streaming tables with unique key are continuously updated. For example 
> queries with a non-windowed aggregation generate such tables. Commonly, such 
> updating tables are emitted via an upsert table sink to an external datastore 
> (k-v store, database) to make it accessible to applications.
> This issue is about adding a feature to store and maintain such a table as 
> queryable state in Flink. By storing the table in Flnk's queryable state, we 
> do not need an external data store to access the results of the query but can 
> query the results directly from Flink.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-6968) Store streaming, updating tables with unique key in queryable state

2018-08-10 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-6968?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16576527#comment-16576527
 ] 

ASF GitHub Bot commented on FLINK-6968:
---

xhumanoid commented on a change in pull request #6434: [FLINK-6968] [table] Add 
Queryable table sink.
URL: https://github.com/apache/flink/pull/6434#discussion_r209314950
 
 

 ##
 File path: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sink/queryable/QueryableTableSinkTest.scala
 ##
 @@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.stream.sink.queryable
+
+import java.time.Duration
+import java.util.concurrent.{ExecutionException, TimeUnit}
+
+import org.apache.flink.api.common.state.ValueStateDescriptor
+import org.apache.flink.api.common.time.{Deadline, Time}
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.api.scala._
+import org.apache.flink.contrib.streaming.state.RocksDBStateBackend
+import org.apache.flink.queryablestate.client.QueryableStateClient
+import 
org.apache.flink.queryablestate.exceptions.UnknownKeyOrNamespaceException
+import org.apache.flink.runtime.state.StateBackend
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.api.{StreamQueryConfig, TableEnvironment}
+import 
org.apache.flink.table.runtime.harness.HarnessTestBase.TestStreamQueryConfig
+import org.apache.flink.table.sinks.queryable.QueryableTableSink
+import org.apache.flink.types.Row
+import org.hamcrest.core.Is
+import org.junit.Assert._
+import org.junit.rules.{ExpectedException, TemporaryFolder}
+import org.junit.{Rule, Test}
+
+
+class QueryableTableSinkTest extends QueryableSinkTestBase {
+
+  private val queryConfig = new StreamQueryConfig()
+  queryConfig.withIdleStateRetentionTime(Time.hours(1), Time.hours(2))
+
+  val _tempFolder = new TemporaryFolder
+  @Rule
+  def tempFolder: TemporaryFolder = _tempFolder
+
+  val _expectedException = ExpectedException.none()
+  @Rule
+  def expectedException: ExpectedException = _expectedException
+
+  def getStateBackend: StateBackend = {
+val dbPath = tempFolder.newFolder().getAbsolutePath
+val checkpointPath = tempFolder.newFolder().toURI.toString
+val backend = new RocksDBStateBackend(checkpointPath)
+backend.setDbStoragePath(dbPath)
+backend
+  }
+
+  @Test
+  def testQueryableSink(): Unit = {
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+env.setStateBackend(getStateBackend)
+val tEnv = TableEnvironment.getTableEnvironment(env)
+
+//name, money
+val data = List(("jeff", -1), ("dean", -2), ("jeff", 2), ("dean", 4))
+val source = new TestKVListSource[String, Int](data)
+
+// select name, sum(money) as sm from t group by name
+val t = env.addSource(source).toTable(tEnv, 'name, 'money)
+.groupBy("name")
+.select("name, sum(money) as sm")
+
+val queryableSink = new QueryableTableSink("prefix",
+  new StreamQueryConfig().withIdleStateRetentionTime(Time.minutes(1), 
Time.minutes(7)))
+
+t.writeToSink(queryableSink)
+
+val clusterClient = 
QueryableSinkTestBase.miniClusterResource.getClusterClient
+val deadline = Deadline.now.plus(Duration.ofSeconds(100))
+
+val autoCancellableJob = new AutoCancellableJob(deadline, clusterClient, 
env.getJavaEnv)
+val client = new QueryableStateClient("localhost", 9084)
+
 
 Review comment:
   Required shutdown client in the end


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Store streaming, updating tables with unique key in queryable state
> ---
>
> Key: FLINK-6968
> 

[jira] [Commented] (FLINK-6968) Store streaming, updating tables with unique key in queryable state

2018-08-10 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-6968?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16576452#comment-16576452
 ] 

ASF GitHub Bot commented on FLINK-6968:
---

xhumanoid commented on a change in pull request #6434: [FLINK-6968] [table] Add 
Queryable table sink.
URL: https://github.com/apache/flink/pull/6434#discussion_r209300017
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/queryable/QueryableTableSink.scala
 ##
 @@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sinks.queryable
+
+import java.lang.{Boolean => JBool}
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.streaming.api.datastream.DataStream
+import org.apache.flink.table.api.StreamQueryConfig
+import org.apache.flink.table.sinks.{TableSinkBase, UpsertStreamTableSink}
+import org.apache.flink.types.Row
+
+/**
+  * A QueryableTableSink stores table in queryable state.
+  *
+  * This class stores table in queryable state so that users can access table 
data without
+  * dependency on external storage.
+  *
+  * Since this is only a kv storage, currently user can only do point query 
against it.
+  *
+  * Example:
+  * {{{
+  *   val env = ExecutionEnvironment.getExecutionEnvironment
+  *   val tEnv = TableEnvironment.getTableEnvironment(env)
+  *
+  *   val table: Table  = ...
+  *
+  *   val queryableTableSink: QueryableTableSink = new QueryableTableSink(
+  *   "prefix",
+  *   queryConfig)
+  *
+  *   tEnv.writeToSink(table, queryableTableSink, config)
+  * }}}
+  *
+  * When program starts to run, user can access state with 
QueryableStateClient.
+  * {{{
+  *   val client = new QueryableStateClient(tmHostname, proxyPort)
+  *   val data = client.getKvState(
+  *   jobId,
+  *   "prefix-column1",
+  *   Row.of(1),
+  *   new RowTypeInfo(Array(TypeInfoformation.of(classOf[Int]), 
Array("id"))
+  *   stateDescriptor)
+  * .get();
+  *
+  * }}}
+  *
+  *
+  * @param namePrefix
+  * @param queryConfig
+  */
+class QueryableTableSink(
+  private val namePrefix: String,
+  private val queryConfig: StreamQueryConfig)
+  extends UpsertStreamTableSink[Row]
+with TableSinkBase[JTuple2[JBool, Row]] {
+  private var keys: Array[String] = _
+
+  override def setKeyFields(keys: Array[String]): Unit = {
+if (keys == null) {
+  throw new IllegalArgumentException("keys can't be null!")
+}
+this.keys = keys
+  }
+
+  override def setIsAppendOnly(isAppendOnly: JBool): Unit = {
+if (isAppendOnly) {
+  throw new IllegalArgumentException("A QueryableTableSink can not be used 
with append-only " +
+"tables as the table would grow infinitely")
+}
+  }
+
+  override def getRecordType: TypeInformation[Row] = new 
RowTypeInfo(getFieldTypes, getFieldNames)
+
+  override def emitDataStream(dataStream: DataStream[JTuple2[JBool, Row]]): 
Unit = {
+val keyIndices = keys.map(getFieldNames.indexOf(_))
+val keyTypes = keyIndices.map(getFieldTypes(_))
+
+val keySelectorType = new RowTypeInfo(keyTypes, keys)
+
+val processFunction = new QueryableStateProcessFunction(
+  namePrefix,
+  queryConfig,
+  keys,
+  getFieldNames,
+  getFieldTypes)
+
+dataStream.keyBy(new RowKeySelector(keyIndices, keySelectorType))
+  .process(processFunction)
+  }
+
+  override protected def copy: TableSinkBase[JTuple2[JBool, Row]] = {
+new QueryableTableSink(this.namePrefix, this.queryConfig)
+  }
+}
+
+
+
+
 
 Review comment:
   remove block of blank lines


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Store streaming, updating tables with unique key in queryable state
> 

[jira] [Commented] (FLINK-6968) Store streaming, updating tables with unique key in queryable state

2018-08-10 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-6968?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16576440#comment-16576440
 ] 

ASF GitHub Bot commented on FLINK-6968:
---

xhumanoid commented on a change in pull request #6434: [FLINK-6968] [table] Add 
Queryable table sink.
URL: https://github.com/apache/flink/pull/6434#discussion_r209297911
 
 

 ##
 File path: flink-libraries/flink-table/pom.xml
 ##
 @@ -186,6 +186,13 @@ under the License.
test
test-jar

+
+   
+   org.apache.flink
+   
flink-queryable-state-runtime_2.11
 
 Review comment:
   replace hardcoded 2.11 with ${scala.binary.version}


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Store streaming, updating tables with unique key in queryable state
> ---
>
> Key: FLINK-6968
> URL: https://issues.apache.org/jira/browse/FLINK-6968
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API  SQL
>Reporter: Fabian Hueske
>Assignee: Renjie Liu
>Priority: Major
>  Labels: pull-request-available
>
> Streaming tables with unique key are continuously updated. For example 
> queries with a non-windowed aggregation generate such tables. Commonly, such 
> updating tables are emitted via an upsert table sink to an external datastore 
> (k-v store, database) to make it accessible to applications.
> This issue is about adding a feature to store and maintain such a table as 
> queryable state in Flink. By storing the table in Flnk's queryable state, we 
> do not need an external data store to access the results of the query but can 
> query the results directly from Flink.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-6968) Store streaming, updating tables with unique key in queryable state

2018-07-26 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-6968?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16559159#comment-16559159
 ] 

ASF GitHub Bot commented on FLINK-6968:
---

liurenjie1024 commented on issue #5688: [FLINK-6968][Table API & SQL] Add 
Queryable table sink.
URL: https://github.com/apache/flink/pull/5688#issuecomment-408291783
 
 
   I'm sorry for messing this up with a wrong rebase. I've opened a cleaner PR 
https://github.com/apache/flink/pull/6434 for the same issue.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Store streaming, updating tables with unique key in queryable state
> ---
>
> Key: FLINK-6968
> URL: https://issues.apache.org/jira/browse/FLINK-6968
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API  SQL
>Reporter: Fabian Hueske
>Assignee: Renjie Liu
>Priority: Major
>  Labels: pull-request-available
>
> Streaming tables with unique key are continuously updated. For example 
> queries with a non-windowed aggregation generate such tables. Commonly, such 
> updating tables are emitted via an upsert table sink to an external datastore 
> (k-v store, database) to make it accessible to applications.
> This issue is about adding a feature to store and maintain such a table as 
> queryable state in Flink. By storing the table in Flnk's queryable state, we 
> do not need an external data store to access the results of the query but can 
> query the results directly from Flink.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-6968) Store streaming, updating tables with unique key in queryable state

2018-07-26 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-6968?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16559156#comment-16559156
 ] 

ASF GitHub Bot commented on FLINK-6968:
---

liurenjie1024 commented on issue #6434: [FLINK-6968][Table API & SQL] Add 
Queryable table sink.
URL: https://github.com/apache/flink/pull/6434#issuecomment-408291551
 
 
   I'm sorry for messing up https://github.com/apache/flink/pull/5688 with a 
wrong rebase and opened this new PR.
   
   @twalthr Please help to review this? I've fixed you comments in 
https://github.com/apache/flink/pull/5688 and added integration test.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Store streaming, updating tables with unique key in queryable state
> ---
>
> Key: FLINK-6968
> URL: https://issues.apache.org/jira/browse/FLINK-6968
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API  SQL
>Reporter: Fabian Hueske
>Assignee: Renjie Liu
>Priority: Major
>  Labels: pull-request-available
>
> Streaming tables with unique key are continuously updated. For example 
> queries with a non-windowed aggregation generate such tables. Commonly, such 
> updating tables are emitted via an upsert table sink to an external datastore 
> (k-v store, database) to make it accessible to applications.
> This issue is about adding a feature to store and maintain such a table as 
> queryable state in Flink. By storing the table in Flnk's queryable state, we 
> do not need an external data store to access the results of the query but can 
> query the results directly from Flink.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-6968) Store streaming, updating tables with unique key in queryable state

2018-07-26 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-6968?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16559155#comment-16559155
 ] 

ASF GitHub Bot commented on FLINK-6968:
---

liurenjie1024 opened a new pull request #6434: [FLINK-6968][Table API & SQL] 
Add Queryable table sink.
URL: https://github.com/apache/flink/pull/6434
 
 
   ## What is the purpose of the change
   
   Streaming tables with unique key are continuously updated. For example 
queries with a non-windowed aggregation generate such tables. Commonly, such 
updating tables are emitted via an upsert table sink to an external datastore 
(k-v store, database) to make it accessible to applications.
   
   ## Brief change log
   
 - *Add a QueryableStateTableSink.*
 - *States are queryable.*
   
   ## Verifying this change
   
   This change added tests and can be verified as follows:
 - *Added test that validates that states will be stored.*
   
   
   ## Documentation
   
 - Does this pull request introduce a new feature? yes
 - If yes, how is the feature documented? docs
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Store streaming, updating tables with unique key in queryable state
> ---
>
> Key: FLINK-6968
> URL: https://issues.apache.org/jira/browse/FLINK-6968
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API  SQL
>Reporter: Fabian Hueske
>Assignee: Renjie Liu
>Priority: Major
>  Labels: pull-request-available
>
> Streaming tables with unique key are continuously updated. For example 
> queries with a non-windowed aggregation generate such tables. Commonly, such 
> updating tables are emitted via an upsert table sink to an external datastore 
> (k-v store, database) to make it accessible to applications.
> This issue is about adding a feature to store and maintain such a table as 
> queryable state in Flink. By storing the table in Flnk's queryable state, we 
> do not need an external data store to access the results of the query but can 
> query the results directly from Flink.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-6968) Store streaming, updating tables with unique key in queryable state

2018-05-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6968?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16490567#comment-16490567
 ] 

ASF GitHub Bot commented on FLINK-6968:
---

Github user liurenjie1024 commented on the issue:

https://github.com/apache/flink/pull/5688
  
@twalthr  I store each column separately to avoid encoding and schema 
management problems, also to reduce unnecessary data transfer when querying 
data. Issue multiple requests for multiple fields is one of the drawbacks.


> Store streaming, updating tables with unique key in queryable state
> ---
>
> Key: FLINK-6968
> URL: https://issues.apache.org/jira/browse/FLINK-6968
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API  SQL
>Reporter: Fabian Hueske
>Assignee: Renjie Liu
>Priority: Major
>
> Streaming tables with unique key are continuously updated. For example 
> queries with a non-windowed aggregation generate such tables. Commonly, such 
> updating tables are emitted via an upsert table sink to an external datastore 
> (k-v store, database) to make it accessible to applications.
> This issue is about adding a feature to store and maintain such a table as 
> queryable state in Flink. By storing the table in Flnk's queryable state, we 
> do not need an external data store to access the results of the query but can 
> query the results directly from Flink.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-6968) Store streaming, updating tables with unique key in queryable state

2018-05-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6968?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16490564#comment-16490564
 ] 

ASF GitHub Bot commented on FLINK-6968:
---

Github user liurenjie1024 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5688#discussion_r190859490
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/KeyedProcessFunctionWithCleanupState.scala
 ---
@@ -44,8 +45,20 @@ abstract class KeyedProcessFunctionWithCleanupState[K, 
I, O](queryConfig: Stream
   protected def registerProcessingCleanupTimer(
 ctx: KeyedProcessFunction[K, I, O]#Context,
 currentTime: Long): Unit = {
-if (stateCleaningEnabled) {
+registerCleanupTimer(ctx, currentTime, TimeDomain.PROCESSING_TIME)
+  }
 
+  protected def registerEventCleanupTimer(
--- End diff --

The reason why I put it in the same PR is that I don't want it to block 
this PR, but I also agree that we should move it to a separate one.


> Store streaming, updating tables with unique key in queryable state
> ---
>
> Key: FLINK-6968
> URL: https://issues.apache.org/jira/browse/FLINK-6968
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API  SQL
>Reporter: Fabian Hueske
>Assignee: Renjie Liu
>Priority: Major
>
> Streaming tables with unique key are continuously updated. For example 
> queries with a non-windowed aggregation generate such tables. Commonly, such 
> updating tables are emitted via an upsert table sink to an external datastore 
> (k-v store, database) to make it accessible to applications.
> This issue is about adding a feature to store and maintain such a table as 
> queryable state in Flink. By storing the table in Flnk's queryable state, we 
> do not need an external data store to access the results of the query but can 
> query the results directly from Flink.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-6968) Store streaming, updating tables with unique key in queryable state

2018-05-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6968?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16486957#comment-16486957
 ] 

ASF GitHub Bot commented on FLINK-6968:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5688#discussion_r190176842
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/KeyedProcessFunctionWithCleanupState.scala
 ---
@@ -44,8 +45,20 @@ abstract class KeyedProcessFunctionWithCleanupState[K, 
I, O](queryConfig: Stream
   protected def registerProcessingCleanupTimer(
 ctx: KeyedProcessFunction[K, I, O]#Context,
 currentTime: Long): Unit = {
-if (stateCleaningEnabled) {
+registerCleanupTimer(ctx, currentTime, TimeDomain.PROCESSING_TIME)
+  }
 
+  protected def registerEventCleanupTimer(
--- End diff --

We implemented state cleanup as processing time because it is easier to 
reason about for users and doesn't interfere that much with event-time 
processing (it is not possible to distinguish timers yet). However, it also has 
a few short comings such as cleared state when recovering a query from a 
savepoint (which we don't really encourage at the moment). 

Anyway, introducing event-time state cleanup should definitely go into a 
separate issue and PR.


> Store streaming, updating tables with unique key in queryable state
> ---
>
> Key: FLINK-6968
> URL: https://issues.apache.org/jira/browse/FLINK-6968
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API  SQL
>Reporter: Fabian Hueske
>Assignee: Renjie Liu
>Priority: Major
>
> Streaming tables with unique key are continuously updated. For example 
> queries with a non-windowed aggregation generate such tables. Commonly, such 
> updating tables are emitted via an upsert table sink to an external datastore 
> (k-v store, database) to make it accessible to applications.
> This issue is about adding a feature to store and maintain such a table as 
> queryable state in Flink. By storing the table in Flnk's queryable state, we 
> do not need an external data store to access the results of the query but can 
> query the results directly from Flink.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-6968) Store streaming, updating tables with unique key in queryable state

2018-05-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6968?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16486823#comment-16486823
 ] 

ASF GitHub Bot commented on FLINK-6968:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/5688#discussion_r190136859
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/QueryableTableSink.scala
 ---
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sinks
+
+import java.lang.{Boolean => JBool}
+
+import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.functions.KeySelector
+import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
+import org.apache.flink.api.java.typeutils.{ResultTypeQueryable, 
RowTypeInfo}
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.datastream.DataStream
+import org.apache.flink.streaming.api.functions.KeyedProcessFunction
+import org.apache.flink.streaming.api.{TimeCharacteristic, TimeDomain}
+import org.apache.flink.table.api.StreamQueryConfig
+import 
org.apache.flink.table.runtime.aggregate.KeyedProcessFunctionWithCleanupState
+import org.apache.flink.types.Row
+import org.apache.flink.util.Collector
+
+/**
+  * A QueryableTableSink stores table in queryable state.
+  *
+  * This class stores table in queryable state so that users can access 
table data without
+  * dependency on external storage.
+  *
+  * Since this is only a kv storage, currently user can only do point 
query against it.
+  *
+  * Example:
+  * {{{
+  *   val env = ExecutionEnvironment.getExecutionEnvironment
+  *   val tEnv = TableEnvironment.getTableEnvironment(env)
+  *
+  *   val table: Table  = ...
+  *
+  *   val queryableTableSink: QueryableTableSink = new QueryableTableSink(
+  *   "prefix",
+  *   queryConfig,
+  *   None)
+  *
+  *   tEnv.writeToSink(table, queryableTableSink, config)
+  * }}}
+  *
+  * When program starts to run, user can access state with 
QueryableStateClient.
+  * {{{
+  *   val client = new QueryableStateClient(tmHostname, proxyPort)
+  *   val data = client.getKvState(
+  *   jobId,
+  *   "prefix-column1",
+  *   Row.of(1),
+  *   new RowTypeInfo(Array(TypeInfoformation.of(classOf[Int]), 
Array("id"))
+  *   stateDescriptor)
+  * .get();
+  *
+  * }}}
+  *
+  *
+  * @param namePrefix
+  * @param queryConfig
+  * @param cleanupTimeDomain
+  */
+class QueryableTableSink(
+private val namePrefix: String,
+private val queryConfig: StreamQueryConfig,
+private val cleanupTimeDomain: Option[TimeDomain])
+  extends UpsertStreamTableSink[Row]
+with TableSinkBase[JTuple2[JBool, Row]] {
+  private var keys: Array[String] = _
+
+  override def setKeyFields(keys: Array[String]): Unit = {
+if (keys == null) {
+  throw new IllegalArgumentException("keys can't be null!")
+}
+this.keys = keys
+  }
+
+  override def setIsAppendOnly(isAppendOnly: JBool): Unit = {
+if (isAppendOnly) {
+  throw new IllegalArgumentException("A QueryableTableSink can not be 
used with append-only " +
+"tables as the table would grow infinitely")
+}
+  }
+
+  override def getRecordType: TypeInformation[Row] = new 
RowTypeInfo(getFieldTypes, getFieldNames)
+
+  override def emitDataStream(dataStream: DataStream[JTuple2[JBool, 
Row]]): Unit = {
+val keyIndices = keys.map(getFieldNames.indexOf(_))
+val keyTypes = keyIndices.map(getFieldTypes(_))
+
+val keySelectorType = new RowTypeInfo(keyTypes, keys)
+
+val processFunction = new 

[jira] [Commented] (FLINK-6968) Store streaming, updating tables with unique key in queryable state

2018-05-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6968?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16486829#comment-16486829
 ] 

ASF GitHub Bot commented on FLINK-6968:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/5688#discussion_r190141550
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java
 ---
@@ -78,6 +82,20 @@ public int numKeyedStateEntries() {
}
}
 
+   public  S getState(K key, StateDescriptor 
stateDesc) throws Exception {
--- End diff --

Is this change necessary? We should only modify code outside of 
`flink-table` if it is urgently needed.


> Store streaming, updating tables with unique key in queryable state
> ---
>
> Key: FLINK-6968
> URL: https://issues.apache.org/jira/browse/FLINK-6968
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API  SQL
>Reporter: Fabian Hueske
>Assignee: Renjie Liu
>Priority: Major
>
> Streaming tables with unique key are continuously updated. For example 
> queries with a non-windowed aggregation generate such tables. Commonly, such 
> updating tables are emitted via an upsert table sink to an external datastore 
> (k-v store, database) to make it accessible to applications.
> This issue is about adding a feature to store and maintain such a table as 
> queryable state in Flink. By storing the table in Flnk's queryable state, we 
> do not need an external data store to access the results of the query but can 
> query the results directly from Flink.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-6968) Store streaming, updating tables with unique key in queryable state

2018-05-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6968?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16486824#comment-16486824
 ] 

ASF GitHub Bot commented on FLINK-6968:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/5688#discussion_r190137151
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/QueryableTableSink.scala
 ---
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sinks
+
+import java.lang.{Boolean => JBool}
+
+import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.functions.KeySelector
+import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
+import org.apache.flink.api.java.typeutils.{ResultTypeQueryable, 
RowTypeInfo}
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.datastream.DataStream
+import org.apache.flink.streaming.api.functions.KeyedProcessFunction
+import org.apache.flink.streaming.api.{TimeCharacteristic, TimeDomain}
+import org.apache.flink.table.api.StreamQueryConfig
+import 
org.apache.flink.table.runtime.aggregate.KeyedProcessFunctionWithCleanupState
+import org.apache.flink.types.Row
+import org.apache.flink.util.Collector
+
+/**
+  * A QueryableTableSink stores table in queryable state.
+  *
+  * This class stores table in queryable state so that users can access 
table data without
+  * dependency on external storage.
+  *
+  * Since this is only a kv storage, currently user can only do point 
query against it.
+  *
+  * Example:
+  * {{{
+  *   val env = ExecutionEnvironment.getExecutionEnvironment
+  *   val tEnv = TableEnvironment.getTableEnvironment(env)
+  *
+  *   val table: Table  = ...
+  *
+  *   val queryableTableSink: QueryableTableSink = new QueryableTableSink(
+  *   "prefix",
+  *   queryConfig,
+  *   None)
+  *
+  *   tEnv.writeToSink(table, queryableTableSink, config)
+  * }}}
+  *
+  * When program starts to run, user can access state with 
QueryableStateClient.
+  * {{{
+  *   val client = new QueryableStateClient(tmHostname, proxyPort)
+  *   val data = client.getKvState(
+  *   jobId,
+  *   "prefix-column1",
+  *   Row.of(1),
+  *   new RowTypeInfo(Array(TypeInfoformation.of(classOf[Int]), 
Array("id"))
+  *   stateDescriptor)
+  * .get();
+  *
+  * }}}
+  *
+  *
+  * @param namePrefix
+  * @param queryConfig
+  * @param cleanupTimeDomain
+  */
+class QueryableTableSink(
+private val namePrefix: String,
+private val queryConfig: StreamQueryConfig,
+private val cleanupTimeDomain: Option[TimeDomain])
+  extends UpsertStreamTableSink[Row]
+with TableSinkBase[JTuple2[JBool, Row]] {
+  private var keys: Array[String] = _
+
+  override def setKeyFields(keys: Array[String]): Unit = {
+if (keys == null) {
+  throw new IllegalArgumentException("keys can't be null!")
+}
+this.keys = keys
+  }
+
+  override def setIsAppendOnly(isAppendOnly: JBool): Unit = {
+if (isAppendOnly) {
+  throw new IllegalArgumentException("A QueryableTableSink can not be 
used with append-only " +
+"tables as the table would grow infinitely")
+}
+  }
+
+  override def getRecordType: TypeInformation[Row] = new 
RowTypeInfo(getFieldTypes, getFieldNames)
+
+  override def emitDataStream(dataStream: DataStream[JTuple2[JBool, 
Row]]): Unit = {
+val keyIndices = keys.map(getFieldNames.indexOf(_))
+val keyTypes = keyIndices.map(getFieldTypes(_))
+
+val keySelectorType = new RowTypeInfo(keyTypes, keys)
+
+val processFunction = new 

[jira] [Commented] (FLINK-6968) Store streaming, updating tables with unique key in queryable state

2018-05-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6968?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16486830#comment-16486830
 ] 

ASF GitHub Bot commented on FLINK-6968:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/5688#discussion_r190141671
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/QueryableTableSinkTest.scala
 ---
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.stream.table
+
+import java.lang.{Boolean => JBool}
+
+import org.apache.flink.api.common.state.ValueStateDescriptor
+import org.apache.flink.api.common.time.Time
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.streaming.api.TimeDomain
+import org.apache.flink.streaming.api.operators.KeyedProcessOperator
+import org.apache.flink.table.api.StreamQueryConfig
+import org.apache.flink.table.runtime.harness.HarnessTestBase
+import org.apache.flink.table.sinks.{QueryableStateProcessFunction, 
RowKeySelector}
+import org.apache.flink.types.Row
+import org.junit.Assert.assertEquals
+import org.junit.Test
+
+class QueryableTableSinkTest extends HarnessTestBase {
+  @Test
+  def testRowSelector(): Unit = {
+val keyTypes = Array(TypeInformation.of(classOf[List[Int]]),
+  TypeInformation.of(classOf[String]))
+val selector = new RowKeySelector(Array(0, 2), new 
RowTypeInfo(keyTypes:_*))
+
+val src = Row.of(List(1), "a", "b")
+val key = selector.getKey(JTuple2.of(true, src))
+
+assertEquals(Row.of(List(1), "b"), key)
+  }
+
+  @Test
+  def testProcessFunction(): Unit = {
+val queryConfig = new StreamQueryConfig()
+  .withIdleStateRetentionTime(Time.milliseconds(2), 
Time.milliseconds(10))
+
+val keys = Array("id")
+val keyType = new RowTypeInfo(TypeInformation.of(classOf[String]))
+val fieldNames = Array("id", "is_manager", "name")
+val fieldTypes: Array[TypeInformation[_]] = Array(
+  TypeInformation.of(classOf[String]).asInstanceOf[TypeInformation[_]],
+  TypeInformation.of(classOf[JBool]).asInstanceOf[TypeInformation[_]],
+  TypeInformation.of(classOf[String]).asInstanceOf[TypeInformation[_]])
+val func = new QueryableStateProcessFunction(
+  "test",
+  queryConfig,
+  keys,
+  fieldNames,
+  fieldTypes,
+  TimeDomain.PROCESSING_TIME)
+
+val operator = new KeyedProcessOperator[Row, JTuple2[JBool, Row], 
Void](func)
+
+val testHarness = createHarnessTester(operator,
+  new RowKeySelector(Array(0), keyType),
+  keyType)
+
+testHarness.open()
+
+val stateDesc1 = new ValueStateDescriptor[JBool]("is_manager",
+  TypeInformation.of(classOf[JBool]))
+stateDesc1.initializeSerializerUnlessSet(operator.getExecutionConfig)
+val stateDesc2 = new ValueStateDescriptor[String]("name", 
TypeInformation.of(classOf[String]))
+stateDesc2.initializeSerializerUnlessSet(operator.getExecutionConfig)
+val key1 = Row.of("1")
+val key2 = Row.of("2")
+
+testHarness.processElement(JTuple2.of(true, Row.of("1", 
JBool.valueOf(true), "jeff")), 2)
+testHarness.processElement(JTuple2.of(true, Row.of("2", 
JBool.valueOf(false), "dean")), 6)
+
+val stateOf = (key: Row, sd: ValueStateDescriptor[_]) => {
+  testHarness.getState(key, sd).value().asInstanceOf[AnyRef]
+}
+
+var expectedData = Array(
+  Row.of(JBool.valueOf(true), "jeff"),
+  Row.of(JBool.valueOf(false), "dean"))
+var storedData = Array(
+  Row.of(stateOf(key1, stateDesc1), stateOf(key1, stateDesc2)),
+  

[jira] [Commented] (FLINK-6968) Store streaming, updating tables with unique key in queryable state

2018-05-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6968?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16486831#comment-16486831
 ] 

ASF GitHub Bot commented on FLINK-6968:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/5688#discussion_r190140977
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/QueryableTableSink.scala
 ---
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sinks
+
+import java.lang.{Boolean => JBool}
+
+import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.functions.KeySelector
+import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
+import org.apache.flink.api.java.typeutils.{ResultTypeQueryable, 
RowTypeInfo}
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.datastream.DataStream
+import org.apache.flink.streaming.api.functions.KeyedProcessFunction
+import org.apache.flink.streaming.api.{TimeCharacteristic, TimeDomain}
+import org.apache.flink.table.api.StreamQueryConfig
+import 
org.apache.flink.table.runtime.aggregate.KeyedProcessFunctionWithCleanupState
+import org.apache.flink.types.Row
+import org.apache.flink.util.Collector
+
+/**
+  * A QueryableTableSink stores table in queryable state.
+  *
+  * This class stores table in queryable state so that users can access 
table data without
+  * dependency on external storage.
+  *
+  * Since this is only a kv storage, currently user can only do point 
query against it.
+  *
+  * Example:
+  * {{{
+  *   val env = ExecutionEnvironment.getExecutionEnvironment
+  *   val tEnv = TableEnvironment.getTableEnvironment(env)
+  *
+  *   val table: Table  = ...
+  *
+  *   val queryableTableSink: QueryableTableSink = new QueryableTableSink(
+  *   "prefix",
+  *   queryConfig,
+  *   None)
+  *
+  *   tEnv.writeToSink(table, queryableTableSink, config)
+  * }}}
+  *
+  * When program starts to run, user can access state with 
QueryableStateClient.
+  * {{{
+  *   val client = new QueryableStateClient(tmHostname, proxyPort)
+  *   val data = client.getKvState(
+  *   jobId,
+  *   "prefix-column1",
+  *   Row.of(1),
+  *   new RowTypeInfo(Array(TypeInfoformation.of(classOf[Int]), 
Array("id"))
+  *   stateDescriptor)
+  * .get();
+  *
+  * }}}
+  *
+  *
+  * @param namePrefix
+  * @param queryConfig
+  * @param cleanupTimeDomain
+  */
+class QueryableTableSink(
+private val namePrefix: String,
+private val queryConfig: StreamQueryConfig,
+private val cleanupTimeDomain: Option[TimeDomain])
+  extends UpsertStreamTableSink[Row]
+with TableSinkBase[JTuple2[JBool, Row]] {
+  private var keys: Array[String] = _
+
+  override def setKeyFields(keys: Array[String]): Unit = {
+if (keys == null) {
+  throw new IllegalArgumentException("keys can't be null!")
+}
+this.keys = keys
+  }
+
+  override def setIsAppendOnly(isAppendOnly: JBool): Unit = {
+if (isAppendOnly) {
+  throw new IllegalArgumentException("A QueryableTableSink can not be 
used with append-only " +
+"tables as the table would grow infinitely")
+}
+  }
+
+  override def getRecordType: TypeInformation[Row] = new 
RowTypeInfo(getFieldTypes, getFieldNames)
+
+  override def emitDataStream(dataStream: DataStream[JTuple2[JBool, 
Row]]): Unit = {
+val keyIndices = keys.map(getFieldNames.indexOf(_))
+val keyTypes = keyIndices.map(getFieldTypes(_))
+
+val keySelectorType = new RowTypeInfo(keyTypes, keys)
+
+val processFunction = new 

[jira] [Commented] (FLINK-6968) Store streaming, updating tables with unique key in queryable state

2018-05-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6968?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16486828#comment-16486828
 ] 

ASF GitHub Bot commented on FLINK-6968:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/5688#discussion_r190140777
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/QueryableTableSink.scala
 ---
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sinks
+
+import java.lang.{Boolean => JBool}
+
+import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.functions.KeySelector
+import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
+import org.apache.flink.api.java.typeutils.{ResultTypeQueryable, 
RowTypeInfo}
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.datastream.DataStream
+import org.apache.flink.streaming.api.functions.KeyedProcessFunction
+import org.apache.flink.streaming.api.{TimeCharacteristic, TimeDomain}
+import org.apache.flink.table.api.StreamQueryConfig
+import 
org.apache.flink.table.runtime.aggregate.KeyedProcessFunctionWithCleanupState
+import org.apache.flink.types.Row
+import org.apache.flink.util.Collector
+
+/**
+  * A QueryableTableSink stores table in queryable state.
+  *
+  * This class stores table in queryable state so that users can access 
table data without
+  * dependency on external storage.
+  *
+  * Since this is only a kv storage, currently user can only do point 
query against it.
+  *
+  * Example:
+  * {{{
+  *   val env = ExecutionEnvironment.getExecutionEnvironment
+  *   val tEnv = TableEnvironment.getTableEnvironment(env)
+  *
+  *   val table: Table  = ...
+  *
+  *   val queryableTableSink: QueryableTableSink = new QueryableTableSink(
+  *   "prefix",
+  *   queryConfig,
+  *   None)
+  *
+  *   tEnv.writeToSink(table, queryableTableSink, config)
+  * }}}
+  *
+  * When program starts to run, user can access state with 
QueryableStateClient.
+  * {{{
+  *   val client = new QueryableStateClient(tmHostname, proxyPort)
+  *   val data = client.getKvState(
+  *   jobId,
+  *   "prefix-column1",
+  *   Row.of(1),
+  *   new RowTypeInfo(Array(TypeInfoformation.of(classOf[Int]), 
Array("id"))
+  *   stateDescriptor)
+  * .get();
+  *
+  * }}}
+  *
+  *
+  * @param namePrefix
+  * @param queryConfig
+  * @param cleanupTimeDomain
+  */
+class QueryableTableSink(
+private val namePrefix: String,
+private val queryConfig: StreamQueryConfig,
+private val cleanupTimeDomain: Option[TimeDomain])
+  extends UpsertStreamTableSink[Row]
+with TableSinkBase[JTuple2[JBool, Row]] {
+  private var keys: Array[String] = _
+
+  override def setKeyFields(keys: Array[String]): Unit = {
+if (keys == null) {
+  throw new IllegalArgumentException("keys can't be null!")
+}
+this.keys = keys
+  }
+
+  override def setIsAppendOnly(isAppendOnly: JBool): Unit = {
+if (isAppendOnly) {
+  throw new IllegalArgumentException("A QueryableTableSink can not be 
used with append-only " +
+"tables as the table would grow infinitely")
+}
+  }
+
+  override def getRecordType: TypeInformation[Row] = new 
RowTypeInfo(getFieldTypes, getFieldNames)
+
+  override def emitDataStream(dataStream: DataStream[JTuple2[JBool, 
Row]]): Unit = {
+val keyIndices = keys.map(getFieldNames.indexOf(_))
+val keyTypes = keyIndices.map(getFieldTypes(_))
+
+val keySelectorType = new RowTypeInfo(keyTypes, keys)
+
+val processFunction = new 

[jira] [Commented] (FLINK-6968) Store streaming, updating tables with unique key in queryable state

2018-05-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6968?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16486826#comment-16486826
 ] 

ASF GitHub Bot commented on FLINK-6968:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/5688#discussion_r190140171
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/QueryableTableSink.scala
 ---
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sinks
+
+import java.lang.{Boolean => JBool}
+
+import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.functions.KeySelector
+import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
+import org.apache.flink.api.java.typeutils.{ResultTypeQueryable, 
RowTypeInfo}
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.datastream.DataStream
+import org.apache.flink.streaming.api.functions.KeyedProcessFunction
+import org.apache.flink.streaming.api.{TimeCharacteristic, TimeDomain}
+import org.apache.flink.table.api.StreamQueryConfig
+import 
org.apache.flink.table.runtime.aggregate.KeyedProcessFunctionWithCleanupState
+import org.apache.flink.types.Row
+import org.apache.flink.util.Collector
+
+/**
+  * A QueryableTableSink stores table in queryable state.
+  *
+  * This class stores table in queryable state so that users can access 
table data without
+  * dependency on external storage.
+  *
+  * Since this is only a kv storage, currently user can only do point 
query against it.
+  *
+  * Example:
+  * {{{
+  *   val env = ExecutionEnvironment.getExecutionEnvironment
+  *   val tEnv = TableEnvironment.getTableEnvironment(env)
+  *
+  *   val table: Table  = ...
+  *
+  *   val queryableTableSink: QueryableTableSink = new QueryableTableSink(
+  *   "prefix",
+  *   queryConfig,
+  *   None)
+  *
+  *   tEnv.writeToSink(table, queryableTableSink, config)
+  * }}}
+  *
+  * When program starts to run, user can access state with 
QueryableStateClient.
+  * {{{
+  *   val client = new QueryableStateClient(tmHostname, proxyPort)
+  *   val data = client.getKvState(
+  *   jobId,
+  *   "prefix-column1",
+  *   Row.of(1),
+  *   new RowTypeInfo(Array(TypeInfoformation.of(classOf[Int]), 
Array("id"))
+  *   stateDescriptor)
+  * .get();
+  *
+  * }}}
+  *
+  *
+  * @param namePrefix
+  * @param queryConfig
+  * @param cleanupTimeDomain
+  */
+class QueryableTableSink(
+private val namePrefix: String,
+private val queryConfig: StreamQueryConfig,
+private val cleanupTimeDomain: Option[TimeDomain])
+  extends UpsertStreamTableSink[Row]
+with TableSinkBase[JTuple2[JBool, Row]] {
+  private var keys: Array[String] = _
+
+  override def setKeyFields(keys: Array[String]): Unit = {
+if (keys == null) {
+  throw new IllegalArgumentException("keys can't be null!")
+}
+this.keys = keys
+  }
+
+  override def setIsAppendOnly(isAppendOnly: JBool): Unit = {
+if (isAppendOnly) {
+  throw new IllegalArgumentException("A QueryableTableSink can not be 
used with append-only " +
+"tables as the table would grow infinitely")
+}
+  }
+
+  override def getRecordType: TypeInformation[Row] = new 
RowTypeInfo(getFieldTypes, getFieldNames)
+
+  override def emitDataStream(dataStream: DataStream[JTuple2[JBool, 
Row]]): Unit = {
+val keyIndices = keys.map(getFieldNames.indexOf(_))
+val keyTypes = keyIndices.map(getFieldTypes(_))
+
+val keySelectorType = new RowTypeInfo(keyTypes, keys)
+
+val processFunction = new 

[jira] [Commented] (FLINK-6968) Store streaming, updating tables with unique key in queryable state

2018-05-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6968?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16486827#comment-16486827
 ] 

ASF GitHub Bot commented on FLINK-6968:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/5688#discussion_r190137656
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/QueryableTableSink.scala
 ---
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sinks
+
+import java.lang.{Boolean => JBool}
+
+import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.functions.KeySelector
+import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
+import org.apache.flink.api.java.typeutils.{ResultTypeQueryable, 
RowTypeInfo}
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.datastream.DataStream
+import org.apache.flink.streaming.api.functions.KeyedProcessFunction
+import org.apache.flink.streaming.api.{TimeCharacteristic, TimeDomain}
+import org.apache.flink.table.api.StreamQueryConfig
+import 
org.apache.flink.table.runtime.aggregate.KeyedProcessFunctionWithCleanupState
+import org.apache.flink.types.Row
+import org.apache.flink.util.Collector
+
+/**
+  * A QueryableTableSink stores table in queryable state.
+  *
+  * This class stores table in queryable state so that users can access 
table data without
+  * dependency on external storage.
+  *
+  * Since this is only a kv storage, currently user can only do point 
query against it.
+  *
+  * Example:
+  * {{{
+  *   val env = ExecutionEnvironment.getExecutionEnvironment
+  *   val tEnv = TableEnvironment.getTableEnvironment(env)
+  *
+  *   val table: Table  = ...
+  *
+  *   val queryableTableSink: QueryableTableSink = new QueryableTableSink(
+  *   "prefix",
+  *   queryConfig,
+  *   None)
+  *
+  *   tEnv.writeToSink(table, queryableTableSink, config)
+  * }}}
+  *
+  * When program starts to run, user can access state with 
QueryableStateClient.
+  * {{{
+  *   val client = new QueryableStateClient(tmHostname, proxyPort)
+  *   val data = client.getKvState(
+  *   jobId,
+  *   "prefix-column1",
+  *   Row.of(1),
+  *   new RowTypeInfo(Array(TypeInfoformation.of(classOf[Int]), 
Array("id"))
+  *   stateDescriptor)
+  * .get();
+  *
+  * }}}
+  *
+  *
+  * @param namePrefix
+  * @param queryConfig
+  * @param cleanupTimeDomain
+  */
+class QueryableTableSink(
+private val namePrefix: String,
+private val queryConfig: StreamQueryConfig,
+private val cleanupTimeDomain: Option[TimeDomain])
+  extends UpsertStreamTableSink[Row]
+with TableSinkBase[JTuple2[JBool, Row]] {
+  private var keys: Array[String] = _
+
+  override def setKeyFields(keys: Array[String]): Unit = {
+if (keys == null) {
+  throw new IllegalArgumentException("keys can't be null!")
+}
+this.keys = keys
+  }
+
+  override def setIsAppendOnly(isAppendOnly: JBool): Unit = {
+if (isAppendOnly) {
+  throw new IllegalArgumentException("A QueryableTableSink can not be 
used with append-only " +
+"tables as the table would grow infinitely")
+}
+  }
+
+  override def getRecordType: TypeInformation[Row] = new 
RowTypeInfo(getFieldTypes, getFieldNames)
+
+  override def emitDataStream(dataStream: DataStream[JTuple2[JBool, 
Row]]): Unit = {
+val keyIndices = keys.map(getFieldNames.indexOf(_))
+val keyTypes = keyIndices.map(getFieldTypes(_))
+
+val keySelectorType = new RowTypeInfo(keyTypes, keys)
+
+val processFunction = new 

[jira] [Commented] (FLINK-6968) Store streaming, updating tables with unique key in queryable state

2018-05-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6968?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16486822#comment-16486822
 ] 

ASF GitHub Bot commented on FLINK-6968:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/5688#discussion_r190135653
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/QueryableTableSink.scala
 ---
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sinks
+
+import java.lang.{Boolean => JBool}
+
+import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.functions.KeySelector
+import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
+import org.apache.flink.api.java.typeutils.{ResultTypeQueryable, 
RowTypeInfo}
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.datastream.DataStream
+import org.apache.flink.streaming.api.functions.KeyedProcessFunction
+import org.apache.flink.streaming.api.{TimeCharacteristic, TimeDomain}
+import org.apache.flink.table.api.StreamQueryConfig
+import 
org.apache.flink.table.runtime.aggregate.KeyedProcessFunctionWithCleanupState
+import org.apache.flink.types.Row
+import org.apache.flink.util.Collector
+
+/**
+  * A QueryableTableSink stores table in queryable state.
--- End diff --

Update the comment.


> Store streaming, updating tables with unique key in queryable state
> ---
>
> Key: FLINK-6968
> URL: https://issues.apache.org/jira/browse/FLINK-6968
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API  SQL
>Reporter: Fabian Hueske
>Assignee: Renjie Liu
>Priority: Major
>
> Streaming tables with unique key are continuously updated. For example 
> queries with a non-windowed aggregation generate such tables. Commonly, such 
> updating tables are emitted via an upsert table sink to an external datastore 
> (k-v store, database) to make it accessible to applications.
> This issue is about adding a feature to store and maintain such a table as 
> queryable state in Flink. By storing the table in Flnk's queryable state, we 
> do not need an external data store to access the results of the query but can 
> query the results directly from Flink.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-6968) Store streaming, updating tables with unique key in queryable state

2018-05-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6968?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16486825#comment-16486825
 ] 

ASF GitHub Bot commented on FLINK-6968:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/5688#discussion_r190136036
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/KeyedProcessFunctionWithCleanupState.scala
 ---
@@ -44,8 +45,20 @@ abstract class KeyedProcessFunctionWithCleanupState[K, 
I, O](queryConfig: Stream
   protected def registerProcessingCleanupTimer(
 ctx: KeyedProcessFunction[K, I, O]#Context,
 currentTime: Long): Unit = {
-if (stateCleaningEnabled) {
+registerCleanupTimer(ctx, currentTime, TimeDomain.PROCESSING_TIME)
+  }
 
+  protected def registerEventCleanupTimer(
--- End diff --

@fhueske is there a reason why we have no event-time clean-up state so far? 
I think it would make sense to move this change to a separate PR. We should 
make the notion of time configurable through `StreamQueryConfig`.


> Store streaming, updating tables with unique key in queryable state
> ---
>
> Key: FLINK-6968
> URL: https://issues.apache.org/jira/browse/FLINK-6968
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API  SQL
>Reporter: Fabian Hueske
>Assignee: Renjie Liu
>Priority: Major
>
> Streaming tables with unique key are continuously updated. For example 
> queries with a non-windowed aggregation generate such tables. Commonly, such 
> updating tables are emitted via an upsert table sink to an external datastore 
> (k-v store, database) to make it accessible to applications.
> This issue is about adding a feature to store and maintain such a table as 
> queryable state in Flink. By storing the table in Flnk's queryable state, we 
> do not need an external data store to access the results of the query but can 
> query the results directly from Flink.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-6968) Store streaming, updating tables with unique key in queryable state

2018-04-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6968?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16447755#comment-16447755
 ] 

ASF GitHub Bot commented on FLINK-6968:
---

Github user liurenjie1024 commented on the issue:

https://github.com/apache/flink/pull/5688
  
@suez1224 Conflict resolved.


> Store streaming, updating tables with unique key in queryable state
> ---
>
> Key: FLINK-6968
> URL: https://issues.apache.org/jira/browse/FLINK-6968
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API  SQL
>Reporter: Fabian Hueske
>Assignee: Renjie Liu
>Priority: Major
>
> Streaming tables with unique key are continuously updated. For example 
> queries with a non-windowed aggregation generate such tables. Commonly, such 
> updating tables are emitted via an upsert table sink to an external datastore 
> (k-v store, database) to make it accessible to applications.
> This issue is about adding a feature to store and maintain such a table as 
> queryable state in Flink. By storing the table in Flnk's queryable state, we 
> do not need an external data store to access the results of the query but can 
> query the results directly from Flink.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-6968) Store streaming, updating tables with unique key in queryable state

2018-03-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6968?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16420721#comment-16420721
 ] 

ASF GitHub Bot commented on FLINK-6968:
---

Github user suez1224 commented on the issue:

https://github.com/apache/flink/pull/5688
  
Could you please rebase your pr to resolve conflict? Thanks.


> Store streaming, updating tables with unique key in queryable state
> ---
>
> Key: FLINK-6968
> URL: https://issues.apache.org/jira/browse/FLINK-6968
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API  SQL
>Reporter: Fabian Hueske
>Assignee: Renjie Liu
>Priority: Major
>
> Streaming tables with unique key are continuously updated. For example 
> queries with a non-windowed aggregation generate such tables. Commonly, such 
> updating tables are emitted via an upsert table sink to an external datastore 
> (k-v store, database) to make it accessible to applications.
> This issue is about adding a feature to store and maintain such a table as 
> queryable state in Flink. By storing the table in Flnk's queryable state, we 
> do not need an external data store to access the results of the query but can 
> query the results directly from Flink.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-6968) Store streaming, updating tables with unique key in queryable state

2018-03-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6968?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16405945#comment-16405945
 ] 

ASF GitHub Bot commented on FLINK-6968:
---

Github user liurenjie1024 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5688#discussion_r175680499
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/QueryableTableSink.scala
 ---
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sinks
+
+import java.lang.{Boolean => JBool}
+
+import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.functions.KeySelector
+import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
+import org.apache.flink.api.java.typeutils.{ResultTypeQueryable, 
RowTypeInfo}
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.datastream.DataStream
+import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.flink.table.api.StreamQueryConfig
+import 
org.apache.flink.table.runtime.aggregate.ProcessFunctionWithCleanupState
+import org.apache.flink.types.Row
+import org.apache.flink.util.Collector
+import org.slf4j.LoggerFactory
+
+class QueryableTableSink(
+private val namePrefix: String,
+private val queryConfig: StreamQueryConfig)
+  extends UpsertStreamTableSink[Row]
+with TableSinkBase[JTuple2[JBool, Row]] {
+  private var keys: Array[String] = _
+
+  override def setKeyFields(keys: Array[String]): Unit = {
+if (keys == null) {
+  throw new IllegalArgumentException("keys can't be null!")
+}
+this.keys = keys
+  }
+
+  override def setIsAppendOnly(isAppendOnly: JBool): Unit = {
+if (isAppendOnly) {
+  throw new IllegalArgumentException("A QueryableTableSink can not be 
used with append-only " +
+"tables as the table would grow infinitely")
+}
+  }
+
+  override def getRecordType: TypeInformation[Row] = new 
RowTypeInfo(getFieldTypes, getFieldNames)
+
+  override def emitDataStream(dataStream: DataStream[JTuple2[JBool, 
Row]]): Unit = {
+val keyIndices = keys.map(getFieldNames.indexOf(_))
+val keyTypes = keyIndices.map(getFieldTypes(_))
+
+val keySelectorType = new RowTypeInfo(keyTypes, keys)
+
+val processFunction = new QueryableStateProcessFunction(
+  namePrefix,
+  queryConfig,
+  keys,
+  getFieldNames,
+  getFieldTypes)
+
+dataStream.keyBy(new RowKeySelector(keyIndices, keySelectorType))
+  .process(processFunction)
+  }
+
+  override protected def copy: TableSinkBase[JTuple2[JBool, Row]] = {
+new QueryableTableSink(this.namePrefix, this.queryConfig)
+  }
+}
+
+class RowKeySelector(
+  private val keyIndices: Array[Int],
+  @transient private val returnType: TypeInformation[Row])
+  extends KeySelector[JTuple2[JBool, Row], Row]
+with ResultTypeQueryable[Row] {
+
+  override def getKey(value: JTuple2[JBool, Row]): Row = {
+val keys = keyIndices
+
+val srcRow = value.f1
+
+val destRow = new Row(keys.length)
+var i = 0
+while (i < keys.length) {
+  destRow.setField(i, srcRow.getField(keys(i)))
+  i += 1
+}
+
+destRow
+  }
+
+  override def getProducedType: TypeInformation[Row] = returnType
+}
+
+class QueryableStateProcessFunction(
+  private val namePrefix: String,
+  private val queryConfig: StreamQueryConfig,
+  private val keyNames: Array[String],
+  private val fieldNames: Array[String],
+  private val fieldTypes: Array[TypeInformation[_]])
+  extends ProcessFunctionWithCleanupState[JTuple2[JBool, Row], 

[jira] [Commented] (FLINK-6968) Store streaming, updating tables with unique key in queryable state

2018-03-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6968?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16405944#comment-16405944
 ] 

ASF GitHub Bot commented on FLINK-6968:
---

Github user liurenjie1024 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5688#discussion_r175680339
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/QueryableTableSink.scala
 ---
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sinks
+
+import java.lang.{Boolean => JBool}
+
+import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.functions.KeySelector
+import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
+import org.apache.flink.api.java.typeutils.{ResultTypeQueryable, 
RowTypeInfo}
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.datastream.DataStream
+import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.flink.table.api.StreamQueryConfig
+import 
org.apache.flink.table.runtime.aggregate.ProcessFunctionWithCleanupState
+import org.apache.flink.types.Row
+import org.apache.flink.util.Collector
+import org.slf4j.LoggerFactory
+
+class QueryableTableSink(
--- End diff --

Java doc added.


> Store streaming, updating tables with unique key in queryable state
> ---
>
> Key: FLINK-6968
> URL: https://issues.apache.org/jira/browse/FLINK-6968
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API  SQL
>Reporter: Fabian Hueske
>Assignee: Renjie Liu
>Priority: Major
>
> Streaming tables with unique key are continuously updated. For example 
> queries with a non-windowed aggregation generate such tables. Commonly, such 
> updating tables are emitted via an upsert table sink to an external datastore 
> (k-v store, database) to make it accessible to applications.
> This issue is about adding a feature to store and maintain such a table as 
> queryable state in Flink. By storing the table in Flnk's queryable state, we 
> do not need an external data store to access the results of the query but can 
> query the results directly from Flink.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-6968) Store streaming, updating tables with unique key in queryable state

2018-03-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6968?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16404297#comment-16404297
 ] 

ASF GitHub Bot commented on FLINK-6968:
---

Github user walterddr commented on a diff in the pull request:

https://github.com/apache/flink/pull/5688#discussion_r175315979
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/QueryableTableSink.scala
 ---
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sinks
+
+import java.lang.{Boolean => JBool}
+
+import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.functions.KeySelector
+import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
+import org.apache.flink.api.java.typeutils.{ResultTypeQueryable, 
RowTypeInfo}
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.datastream.DataStream
+import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.flink.table.api.StreamQueryConfig
+import 
org.apache.flink.table.runtime.aggregate.ProcessFunctionWithCleanupState
+import org.apache.flink.types.Row
+import org.apache.flink.util.Collector
+import org.slf4j.LoggerFactory
+
+class QueryableTableSink(
--- End diff --

Besides @xccui 's suggestion in adding formal docs. Could you also add some 
java doc style comment here explaining what this table sink does and how this 
sink is intended to be used? 


> Store streaming, updating tables with unique key in queryable state
> ---
>
> Key: FLINK-6968
> URL: https://issues.apache.org/jira/browse/FLINK-6968
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API  SQL
>Reporter: Fabian Hueske
>Assignee: Renjie Liu
>Priority: Major
>
> Streaming tables with unique key are continuously updated. For example 
> queries with a non-windowed aggregation generate such tables. Commonly, such 
> updating tables are emitted via an upsert table sink to an external datastore 
> (k-v store, database) to make it accessible to applications.
> This issue is about adding a feature to store and maintain such a table as 
> queryable state in Flink. By storing the table in Flnk's queryable state, we 
> do not need an external data store to access the results of the query but can 
> query the results directly from Flink.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-6968) Store streaming, updating tables with unique key in queryable state

2018-03-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6968?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16404298#comment-16404298
 ] 

ASF GitHub Bot commented on FLINK-6968:
---

Github user walterddr commented on a diff in the pull request:

https://github.com/apache/flink/pull/5688#discussion_r175317673
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/QueryableTableSink.scala
 ---
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sinks
+
+import java.lang.{Boolean => JBool}
+
+import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.functions.KeySelector
+import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
+import org.apache.flink.api.java.typeutils.{ResultTypeQueryable, 
RowTypeInfo}
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.datastream.DataStream
+import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.flink.table.api.StreamQueryConfig
+import 
org.apache.flink.table.runtime.aggregate.ProcessFunctionWithCleanupState
+import org.apache.flink.types.Row
+import org.apache.flink.util.Collector
+import org.slf4j.LoggerFactory
+
+class QueryableTableSink(
+private val namePrefix: String,
+private val queryConfig: StreamQueryConfig)
+  extends UpsertStreamTableSink[Row]
+with TableSinkBase[JTuple2[JBool, Row]] {
+  private var keys: Array[String] = _
+
+  override def setKeyFields(keys: Array[String]): Unit = {
+if (keys == null) {
+  throw new IllegalArgumentException("keys can't be null!")
+}
+this.keys = keys
+  }
+
+  override def setIsAppendOnly(isAppendOnly: JBool): Unit = {
+if (isAppendOnly) {
+  throw new IllegalArgumentException("A QueryableTableSink can not be 
used with append-only " +
+"tables as the table would grow infinitely")
+}
+  }
+
+  override def getRecordType: TypeInformation[Row] = new 
RowTypeInfo(getFieldTypes, getFieldNames)
+
+  override def emitDataStream(dataStream: DataStream[JTuple2[JBool, 
Row]]): Unit = {
+val keyIndices = keys.map(getFieldNames.indexOf(_))
+val keyTypes = keyIndices.map(getFieldTypes(_))
+
+val keySelectorType = new RowTypeInfo(keyTypes, keys)
+
+val processFunction = new QueryableStateProcessFunction(
+  namePrefix,
+  queryConfig,
+  keys,
+  getFieldNames,
+  getFieldTypes)
+
+dataStream.keyBy(new RowKeySelector(keyIndices, keySelectorType))
+  .process(processFunction)
+  }
+
+  override protected def copy: TableSinkBase[JTuple2[JBool, Row]] = {
+new QueryableTableSink(this.namePrefix, this.queryConfig)
+  }
+}
+
+class RowKeySelector(
+  private val keyIndices: Array[Int],
+  @transient private val returnType: TypeInformation[Row])
+  extends KeySelector[JTuple2[JBool, Row], Row]
+with ResultTypeQueryable[Row] {
+
+  override def getKey(value: JTuple2[JBool, Row]): Row = {
+val keys = keyIndices
+
+val srcRow = value.f1
+
+val destRow = new Row(keys.length)
+var i = 0
+while (i < keys.length) {
+  destRow.setField(i, srcRow.getField(keys(i)))
+  i += 1
+}
+
+destRow
+  }
+
+  override def getProducedType: TypeInformation[Row] = returnType
+}
+
+class QueryableStateProcessFunction(
+  private val namePrefix: String,
+  private val queryConfig: StreamQueryConfig,
+  private val keyNames: Array[String],
+  private val fieldNames: Array[String],
+  private val fieldTypes: Array[TypeInformation[_]])
+  extends ProcessFunctionWithCleanupState[JTuple2[JBool, Row], 

[jira] [Commented] (FLINK-6968) Store streaming, updating tables with unique key in queryable state

2018-03-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6968?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16401424#comment-16401424
 ] 

ASF GitHub Bot commented on FLINK-6968:
---

Github user liurenjie1024 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5688#discussion_r174990348
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/QueryableTableSink.scala
 ---
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sinks
+
+import java.lang.{Boolean => JBool}
+
+import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.functions.KeySelector
+import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
+import org.apache.flink.api.java.typeutils.{ResultTypeQueryable, 
RowTypeInfo}
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.datastream.DataStream
+import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.flink.table.api.StreamQueryConfig
+import 
org.apache.flink.table.runtime.aggregate.ProcessFunctionWithCleanupState
+import org.apache.flink.types.Row
+import org.apache.flink.util.Collector
+import org.slf4j.LoggerFactory
+
+class QueryableTableSink(
+  private val namePrefix: String,
--- End diff --

Fixed.


> Store streaming, updating tables with unique key in queryable state
> ---
>
> Key: FLINK-6968
> URL: https://issues.apache.org/jira/browse/FLINK-6968
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API  SQL
>Reporter: Fabian Hueske
>Assignee: Renjie Liu
>Priority: Major
>
> Streaming tables with unique key are continuously updated. For example 
> queries with a non-windowed aggregation generate such tables. Commonly, such 
> updating tables are emitted via an upsert table sink to an external datastore 
> (k-v store, database) to make it accessible to applications.
> This issue is about adding a feature to store and maintain such a table as 
> queryable state in Flink. By storing the table in Flnk's queryable state, we 
> do not need an external data store to access the results of the query but can 
> query the results directly from Flink.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-6968) Store streaming, updating tables with unique key in queryable state

2018-03-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6968?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16398795#comment-16398795
 ] 

ASF GitHub Bot commented on FLINK-6968:
---

Github user xccui commented on a diff in the pull request:

https://github.com/apache/flink/pull/5688#discussion_r174511059
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/QueryableTableSink.scala
 ---
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sinks
+
+import java.lang.{Boolean => JBool}
+
+import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.functions.KeySelector
+import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
+import org.apache.flink.api.java.typeutils.{ResultTypeQueryable, 
RowTypeInfo}
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.datastream.DataStream
+import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.flink.table.api.StreamQueryConfig
+import 
org.apache.flink.table.runtime.aggregate.ProcessFunctionWithCleanupState
+import org.apache.flink.types.Row
+import org.apache.flink.util.Collector
+import org.slf4j.LoggerFactory
+
+class QueryableTableSink(
+  private val namePrefix: String,
--- End diff --

Four spaces indent for parameter declaring (which is also applicable for 
methods).


> Store streaming, updating tables with unique key in queryable state
> ---
>
> Key: FLINK-6968
> URL: https://issues.apache.org/jira/browse/FLINK-6968
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API  SQL
>Reporter: Fabian Hueske
>Assignee: Renjie Liu
>Priority: Major
>
> Streaming tables with unique key are continuously updated. For example 
> queries with a non-windowed aggregation generate such tables. Commonly, such 
> updating tables are emitted via an upsert table sink to an external datastore 
> (k-v store, database) to make it accessible to applications.
> This issue is about adding a feature to store and maintain such a table as 
> queryable state in Flink. By storing the table in Flnk's queryable state, we 
> do not need an external data store to access the results of the query but can 
> query the results directly from Flink.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-6968) Store streaming, updating tables with unique key in queryable state

2018-03-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6968?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16398173#comment-16398173
 ] 

ASF GitHub Bot commented on FLINK-6968:
---

Github user liurenjie1024 commented on the issue:

https://github.com/apache/flink/pull/5688
  
@xccui Thanks for you suggestions.

1. Add an it test for this is difficult. Think about it, I need to ensure 
that elements to be processed while keeping the job running, and it's difficult 
to achieve this. But I will do some manual test for that.
2. I'll add doc for that.
3. I'll squash the commits when review is done.


> Store streaming, updating tables with unique key in queryable state
> ---
>
> Key: FLINK-6968
> URL: https://issues.apache.org/jira/browse/FLINK-6968
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API  SQL
>Reporter: Fabian Hueske
>Assignee: Renjie Liu
>Priority: Major
>
> Streaming tables with unique key are continuously updated. For example 
> queries with a non-windowed aggregation generate such tables. Commonly, such 
> updating tables are emitted via an upsert table sink to an external datastore 
> (k-v store, database) to make it accessible to applications.
> This issue is about adding a feature to store and maintain such a table as 
> queryable state in Flink. By storing the table in Flnk's queryable state, we 
> do not need an external data store to access the results of the query but can 
> query the results directly from Flink.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-6968) Store streaming, updating tables with unique key in queryable state

2018-03-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6968?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16398169#comment-16398169
 ] 

ASF GitHub Bot commented on FLINK-6968:
---

Github user liurenjie1024 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5688#discussion_r174366980
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/QueryableTableSinkTest.scala
 ---
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.stream.table
+
+import java.lang.{Boolean => JBool}
+
+import org.apache.flink.api.common.state.ValueStateDescriptor
+import org.apache.flink.api.common.time.Time
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.streaming.api.operators.LegacyKeyedProcessOperator
+import org.apache.flink.table.api.StreamQueryConfig
+import org.apache.flink.table.runtime.harness.HarnessTestBase
+import org.apache.flink.table.sinks.{QueryableStateProcessFunction, 
RowKeySelector}
+import org.apache.flink.types.Row
+import org.junit.Assert.assertEquals
+import org.junit.Test
+
+class QueryableTableSinkTest extends HarnessTestBase {
+  @Test
+  def testRowSelector(): Unit = {
+val keyTypes = Array(TypeInformation.of(classOf[List[Int]]), 
TypeInformation.of(classOf[String]))
+val selector = new RowKeySelector(Array(0, 2), new 
RowTypeInfo(keyTypes:_*))
+
+val src = Row.of(List(1), "a", "b")
+val key = selector.getKey(JTuple2.of(true, src))
+
+assertEquals(Row.of(List(1), "b"), key)
+  }
+
+  @Test
+  def testProcessFunction(): Unit = {
+val queryConfig = new StreamQueryConfig()
+  .withIdleStateRetentionTime(Time.milliseconds(2), 
Time.milliseconds(10))
+
+val keys = Array("id")
+val keyType = new RowTypeInfo(TypeInformation.of(classOf[String]))
+val fieldNames = Array("id", "is_manager", "name")
+val fieldTypes: Array[TypeInformation[_]] = Array(
+  TypeInformation.of(classOf[String]).asInstanceOf[TypeInformation[_]],
+  TypeInformation.of(classOf[JBool]).asInstanceOf[TypeInformation[_]],
+  TypeInformation.of(classOf[String]).asInstanceOf[TypeInformation[_]])
+val func = new QueryableStateProcessFunction("test", queryConfig, 
keys, fieldNames, fieldTypes)
+
+val operator = new LegacyKeyedProcessOperator[Row, JTuple2[JBool, 
Row], Void](func)
--- End diff --

Same as above.


> Store streaming, updating tables with unique key in queryable state
> ---
>
> Key: FLINK-6968
> URL: https://issues.apache.org/jira/browse/FLINK-6968
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API  SQL
>Reporter: Fabian Hueske
>Assignee: Renjie Liu
>Priority: Major
>
> Streaming tables with unique key are continuously updated. For example 
> queries with a non-windowed aggregation generate such tables. Commonly, such 
> updating tables are emitted via an upsert table sink to an external datastore 
> (k-v store, database) to make it accessible to applications.
> This issue is about adding a feature to store and maintain such a table as 
> queryable state in Flink. By storing the table in Flnk's queryable state, we 
> do not need an external data store to access the results of the query but can 
> query the results directly from Flink.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-6968) Store streaming, updating tables with unique key in queryable state

2018-03-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6968?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16398166#comment-16398166
 ] 

ASF GitHub Bot commented on FLINK-6968:
---

Github user liurenjie1024 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5688#discussion_r174366242
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/QueryableTableSink.scala
 ---
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sinks
+
+import java.lang.{Boolean => JBool}
+
+import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.functions.KeySelector
+import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
+import org.apache.flink.api.java.typeutils.{ResultTypeQueryable, 
RowTypeInfo}
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.datastream.DataStream
+import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.flink.table.api.StreamQueryConfig
+import 
org.apache.flink.table.runtime.aggregate.ProcessFunctionWithCleanupState
+import org.apache.flink.types.Row
+import org.apache.flink.util.Collector
+import org.slf4j.LoggerFactory
+
+class QueryableTableSink(private val namePrefix: String,
+ private val queryConfig: StreamQueryConfig)
+  extends UpsertStreamTableSink[Row]
+  with TableSinkBase[JTuple2[JBool, Row]] {
+  private var keys: Array[String] = _
+
+  /**
+* Configures the unique key fields of the [[Table]] to write.
+* The method is called after [[TableSink.configure()]].
+*
+* The keys array might be empty, if the table consists of a single 
(updated) record.
+* If the table does not have a key and is append-only, the keys 
attribute is null.
+*
+* @param keys the field names of the table's keys, an empty array if 
the table has a single
+* row, and null if the table is append-only and has no key.
+*/
+  override def setKeyFields(keys: Array[String]): Unit = {
+if (keys == null) {
+  throw new IllegalArgumentException("keys can't be null!")
+}
+this.keys = keys
+  }
+
+  /**
+* Specifies whether the [[Table]] to write is append-only or not.
+*
+* @param isAppendOnly true if the table is append-only, false 
otherwise.
+*/
+  override def setIsAppendOnly(isAppendOnly: JBool): Unit = {
+if (isAppendOnly) {
+  throw new IllegalArgumentException("A QueryableTableSink can not be 
used with append-only tables " +
+"as the table would grow infinitely")
+}
+  }
+
+  /** Returns the requested record type */
+  override def getRecordType: TypeInformation[Row] = new 
RowTypeInfo(getFieldTypes, getFieldNames)
+
+  /** Emits the DataStream. */
+  override def emitDataStream(dataStream: DataStream[JTuple2[JBool, 
Row]]): Unit = {
+val keyIndices = keys.map(getFieldNames.indexOf(_))
+val keyTypes = keyIndices.map(getFieldTypes(_))
+
+val keySelectorType = new RowTypeInfo(keyTypes, keys)
+
+val processFunction = new QueryableStateProcessFunction(
+  namePrefix,
+  queryConfig,
+  keys,
+  getFieldNames,
+  getFieldTypes)
+
+dataStream.keyBy(new RowKeySelector(keyIndices, keySelectorType))
+  .process(processFunction)
--- End diff --

I'm waiting another [PR](https://github.com/apache/flink/pull/5680) to be 
merged.


> Store streaming, updating tables with unique key in queryable state
> ---
>
> Key: FLINK-6968
> URL: https://issues.apache.org/jira/browse/FLINK-6968
> 

[jira] [Commented] (FLINK-6968) Store streaming, updating tables with unique key in queryable state

2018-03-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6968?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16398018#comment-16398018
 ] 

ASF GitHub Bot commented on FLINK-6968:
---

Github user xccui commented on a diff in the pull request:

https://github.com/apache/flink/pull/5688#discussion_r174338993
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/QueryableTableSink.scala
 ---
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sinks
+
+import java.lang.{Boolean => JBool}
+
+import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.functions.KeySelector
+import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
+import org.apache.flink.api.java.typeutils.{ResultTypeQueryable, 
RowTypeInfo}
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.datastream.DataStream
+import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.flink.table.api.StreamQueryConfig
+import 
org.apache.flink.table.runtime.aggregate.ProcessFunctionWithCleanupState
+import org.apache.flink.types.Row
+import org.apache.flink.util.Collector
+import org.slf4j.LoggerFactory
--- End diff --

Remove unused imports.


> Store streaming, updating tables with unique key in queryable state
> ---
>
> Key: FLINK-6968
> URL: https://issues.apache.org/jira/browse/FLINK-6968
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API  SQL
>Reporter: Fabian Hueske
>Assignee: Renjie Liu
>Priority: Major
>
> Streaming tables with unique key are continuously updated. For example 
> queries with a non-windowed aggregation generate such tables. Commonly, such 
> updating tables are emitted via an upsert table sink to an external datastore 
> (k-v store, database) to make it accessible to applications.
> This issue is about adding a feature to store and maintain such a table as 
> queryable state in Flink. By storing the table in Flnk's queryable state, we 
> do not need an external data store to access the results of the query but can 
> query the results directly from Flink.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-6968) Store streaming, updating tables with unique key in queryable state

2018-03-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6968?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16398015#comment-16398015
 ] 

ASF GitHub Bot commented on FLINK-6968:
---

Github user xccui commented on a diff in the pull request:

https://github.com/apache/flink/pull/5688#discussion_r174338968
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/QueryableTableSink.scala
 ---
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sinks
+
+import java.lang.{Boolean => JBool}
+
+import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.functions.KeySelector
+import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
+import org.apache.flink.api.java.typeutils.{ResultTypeQueryable, 
RowTypeInfo}
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.datastream.DataStream
+import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.flink.table.api.StreamQueryConfig
+import 
org.apache.flink.table.runtime.aggregate.ProcessFunctionWithCleanupState
+import org.apache.flink.types.Row
+import org.apache.flink.util.Collector
+import org.slf4j.LoggerFactory
+
+class QueryableTableSink(private val namePrefix: String,
+ private val queryConfig: StreamQueryConfig)
+  extends UpsertStreamTableSink[Row]
+  with TableSinkBase[JTuple2[JBool, Row]] {
+  private var keys: Array[String] = _
+
+  /**
+* Configures the unique key fields of the [[Table]] to write.
--- End diff --

Use the full class name for `[[Table]]` since it's not imported.


> Store streaming, updating tables with unique key in queryable state
> ---
>
> Key: FLINK-6968
> URL: https://issues.apache.org/jira/browse/FLINK-6968
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API  SQL
>Reporter: Fabian Hueske
>Assignee: Renjie Liu
>Priority: Major
>
> Streaming tables with unique key are continuously updated. For example 
> queries with a non-windowed aggregation generate such tables. Commonly, such 
> updating tables are emitted via an upsert table sink to an external datastore 
> (k-v store, database) to make it accessible to applications.
> This issue is about adding a feature to store and maintain such a table as 
> queryable state in Flink. By storing the table in Flnk's queryable state, we 
> do not need an external data store to access the results of the query but can 
> query the results directly from Flink.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-6968) Store streaming, updating tables with unique key in queryable state

2018-03-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6968?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16398020#comment-16398020
 ] 

ASF GitHub Bot commented on FLINK-6968:
---

Github user xccui commented on a diff in the pull request:

https://github.com/apache/flink/pull/5688#discussion_r174339810
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/QueryableTableSinkTest.scala
 ---
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.stream.table
+
+import java.lang.{Boolean => JBool}
+
+import org.apache.flink.api.common.state.ValueStateDescriptor
+import org.apache.flink.api.common.time.Time
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.streaming.api.operators.LegacyKeyedProcessOperator
+import org.apache.flink.table.api.StreamQueryConfig
+import org.apache.flink.table.runtime.harness.HarnessTestBase
+import org.apache.flink.table.sinks.{QueryableStateProcessFunction, 
RowKeySelector}
+import org.apache.flink.types.Row
+import org.junit.Assert.assertEquals
+import org.junit.Test
+
+class QueryableTableSinkTest extends HarnessTestBase {
+  @Test
+  def testRowSelector(): Unit = {
+val keyTypes = Array(TypeInformation.of(classOf[List[Int]]), 
TypeInformation.of(classOf[String]))
+val selector = new RowKeySelector(Array(0, 2), new 
RowTypeInfo(keyTypes:_*))
+
+val src = Row.of(List(1), "a", "b")
+val key = selector.getKey(JTuple2.of(true, src))
+
+assertEquals(Row.of(List(1), "b"), key)
+  }
+
+  @Test
+  def testProcessFunction(): Unit = {
+val queryConfig = new StreamQueryConfig()
+  .withIdleStateRetentionTime(Time.milliseconds(2), 
Time.milliseconds(10))
+
+val keys = Array("id")
+val keyType = new RowTypeInfo(TypeInformation.of(classOf[String]))
+val fieldNames = Array("id", "is_manager", "name")
+val fieldTypes: Array[TypeInformation[_]] = Array(
+  TypeInformation.of(classOf[String]).asInstanceOf[TypeInformation[_]],
+  TypeInformation.of(classOf[JBool]).asInstanceOf[TypeInformation[_]],
+  TypeInformation.of(classOf[String]).asInstanceOf[TypeInformation[_]])
+val func = new QueryableStateProcessFunction("test", queryConfig, 
keys, fieldNames, fieldTypes)
+
+val operator = new LegacyKeyedProcessOperator[Row, JTuple2[JBool, 
Row], Void](func)
+
+val testHarness = createHarnessTester(operator,
+  new RowKeySelector(Array(0), keyType),
+  keyType)
+
+testHarness.open()
+
+
--- End diff --

Remove extra blank lines.


> Store streaming, updating tables with unique key in queryable state
> ---
>
> Key: FLINK-6968
> URL: https://issues.apache.org/jira/browse/FLINK-6968
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API  SQL
>Reporter: Fabian Hueske
>Assignee: Renjie Liu
>Priority: Major
>
> Streaming tables with unique key are continuously updated. For example 
> queries with a non-windowed aggregation generate such tables. Commonly, such 
> updating tables are emitted via an upsert table sink to an external datastore 
> (k-v store, database) to make it accessible to applications.
> This issue is about adding a feature to store and maintain such a table as 
> queryable state in Flink. By storing the table in Flnk's queryable state, we 
> do not need an external data store to access the results of the query but can 
> query the results directly from Flink.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-6968) Store streaming, updating tables with unique key in queryable state

2018-03-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6968?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16398017#comment-16398017
 ] 

ASF GitHub Bot commented on FLINK-6968:
---

Github user xccui commented on a diff in the pull request:

https://github.com/apache/flink/pull/5688#discussion_r174338981
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/QueryableTableSink.scala
 ---
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sinks
+
+import java.lang.{Boolean => JBool}
+
+import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.functions.KeySelector
+import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
+import org.apache.flink.api.java.typeutils.{ResultTypeQueryable, 
RowTypeInfo}
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.datastream.DataStream
+import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.flink.table.api.StreamQueryConfig
+import 
org.apache.flink.table.runtime.aggregate.ProcessFunctionWithCleanupState
+import org.apache.flink.types.Row
+import org.apache.flink.util.Collector
+import org.slf4j.LoggerFactory
+
+class QueryableTableSink(private val namePrefix: String,
+ private val queryConfig: StreamQueryConfig)
--- End diff --

Format the code like that.
```
class QueryableTableSink(
private val namePrefix: String,
private val queryConfig: StreamQueryConfig)
  extends UpsertStreamTableSink[Row]
...
```



> Store streaming, updating tables with unique key in queryable state
> ---
>
> Key: FLINK-6968
> URL: https://issues.apache.org/jira/browse/FLINK-6968
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API  SQL
>Reporter: Fabian Hueske
>Assignee: Renjie Liu
>Priority: Major
>
> Streaming tables with unique key are continuously updated. For example 
> queries with a non-windowed aggregation generate such tables. Commonly, such 
> updating tables are emitted via an upsert table sink to an external datastore 
> (k-v store, database) to make it accessible to applications.
> This issue is about adding a feature to store and maintain such a table as 
> queryable state in Flink. By storing the table in Flnk's queryable state, we 
> do not need an external data store to access the results of the query but can 
> query the results directly from Flink.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-6968) Store streaming, updating tables with unique key in queryable state

2018-03-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6968?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16398016#comment-16398016
 ] 

ASF GitHub Bot commented on FLINK-6968:
---

Github user xccui commented on a diff in the pull request:

https://github.com/apache/flink/pull/5688#discussion_r174339658
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/QueryableTableSinkTest.scala
 ---
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.stream.table
+
+import java.lang.{Boolean => JBool}
+
+import org.apache.flink.api.common.state.ValueStateDescriptor
+import org.apache.flink.api.common.time.Time
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.streaming.api.operators.LegacyKeyedProcessOperator
+import org.apache.flink.table.api.StreamQueryConfig
+import org.apache.flink.table.runtime.harness.HarnessTestBase
+import org.apache.flink.table.sinks.{QueryableStateProcessFunction, 
RowKeySelector}
+import org.apache.flink.types.Row
+import org.junit.Assert.assertEquals
+import org.junit.Test
+
+class QueryableTableSinkTest extends HarnessTestBase {
+  @Test
+  def testRowSelector(): Unit = {
+val keyTypes = Array(TypeInformation.of(classOf[List[Int]]), 
TypeInformation.of(classOf[String]))
+val selector = new RowKeySelector(Array(0, 2), new 
RowTypeInfo(keyTypes:_*))
+
+val src = Row.of(List(1), "a", "b")
+val key = selector.getKey(JTuple2.of(true, src))
+
+assertEquals(Row.of(List(1), "b"), key)
+  }
+
+  @Test
+  def testProcessFunction(): Unit = {
+val queryConfig = new StreamQueryConfig()
+  .withIdleStateRetentionTime(Time.milliseconds(2), 
Time.milliseconds(10))
+
+val keys = Array("id")
+val keyType = new RowTypeInfo(TypeInformation.of(classOf[String]))
+val fieldNames = Array("id", "is_manager", "name")
+val fieldTypes: Array[TypeInformation[_]] = Array(
+  TypeInformation.of(classOf[String]).asInstanceOf[TypeInformation[_]],
+  TypeInformation.of(classOf[JBool]).asInstanceOf[TypeInformation[_]],
+  TypeInformation.of(classOf[String]).asInstanceOf[TypeInformation[_]])
+val func = new QueryableStateProcessFunction("test", queryConfig, 
keys, fieldNames, fieldTypes)
+
+val operator = new LegacyKeyedProcessOperator[Row, JTuple2[JBool, 
Row], Void](func)
--- End diff --

Try to avoid using deprecated classes/methods.


> Store streaming, updating tables with unique key in queryable state
> ---
>
> Key: FLINK-6968
> URL: https://issues.apache.org/jira/browse/FLINK-6968
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API  SQL
>Reporter: Fabian Hueske
>Assignee: Renjie Liu
>Priority: Major
>
> Streaming tables with unique key are continuously updated. For example 
> queries with a non-windowed aggregation generate such tables. Commonly, such 
> updating tables are emitted via an upsert table sink to an external datastore 
> (k-v store, database) to make it accessible to applications.
> This issue is about adding a feature to store and maintain such a table as 
> queryable state in Flink. By storing the table in Flnk's queryable state, we 
> do not need an external data store to access the results of the query but can 
> query the results directly from Flink.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-6968) Store streaming, updating tables with unique key in queryable state

2018-03-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6968?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16398019#comment-16398019
 ] 

ASF GitHub Bot commented on FLINK-6968:
---

Github user xccui commented on a diff in the pull request:

https://github.com/apache/flink/pull/5688#discussion_r174339296
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/QueryableTableSink.scala
 ---
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sinks
+
+import java.lang.{Boolean => JBool}
+
+import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.functions.KeySelector
+import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
+import org.apache.flink.api.java.typeutils.{ResultTypeQueryable, 
RowTypeInfo}
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.datastream.DataStream
+import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.flink.table.api.StreamQueryConfig
+import 
org.apache.flink.table.runtime.aggregate.ProcessFunctionWithCleanupState
+import org.apache.flink.types.Row
+import org.apache.flink.util.Collector
+import org.slf4j.LoggerFactory
+
+class QueryableTableSink(private val namePrefix: String,
+ private val queryConfig: StreamQueryConfig)
+  extends UpsertStreamTableSink[Row]
+  with TableSinkBase[JTuple2[JBool, Row]] {
+  private var keys: Array[String] = _
+
+  /**
+* Configures the unique key fields of the [[Table]] to write.
+* The method is called after [[TableSink.configure()]].
+*
+* The keys array might be empty, if the table consists of a single 
(updated) record.
+* If the table does not have a key and is append-only, the keys 
attribute is null.
+*
+* @param keys the field names of the table's keys, an empty array if 
the table has a single
+* row, and null if the table is append-only and has no key.
+*/
+  override def setKeyFields(keys: Array[String]): Unit = {
+if (keys == null) {
+  throw new IllegalArgumentException("keys can't be null!")
+}
+this.keys = keys
+  }
+
+  /**
+* Specifies whether the [[Table]] to write is append-only or not.
+*
+* @param isAppendOnly true if the table is append-only, false 
otherwise.
+*/
+  override def setIsAppendOnly(isAppendOnly: JBool): Unit = {
+if (isAppendOnly) {
+  throw new IllegalArgumentException("A QueryableTableSink can not be 
used with append-only tables " +
+"as the table would grow infinitely")
+}
+  }
+
+  /** Returns the requested record type */
+  override def getRecordType: TypeInformation[Row] = new 
RowTypeInfo(getFieldTypes, getFieldNames)
+
+  /** Emits the DataStream. */
+  override def emitDataStream(dataStream: DataStream[JTuple2[JBool, 
Row]]): Unit = {
+val keyIndices = keys.map(getFieldNames.indexOf(_))
+val keyTypes = keyIndices.map(getFieldTypes(_))
+
+val keySelectorType = new RowTypeInfo(keyTypes, keys)
+
+val processFunction = new QueryableStateProcessFunction(
+  namePrefix,
+  queryConfig,
+  keys,
+  getFieldNames,
+  getFieldTypes)
+
+dataStream.keyBy(new RowKeySelector(keyIndices, keySelectorType))
+  .process(processFunction)
+  }
+
+  /** Return a deep copy of the [[TableSink]]. */
+  override protected def copy: TableSinkBase[JTuple2[JBool, Row]] = {
--- End diff --

The docs for overridden methods could be omitted.


> Store streaming, updating tables with unique key in queryable state
> ---
>
> 

[jira] [Commented] (FLINK-6968) Store streaming, updating tables with unique key in queryable state

2018-03-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6968?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16398001#comment-16398001
 ] 

ASF GitHub Bot commented on FLINK-6968:
---

Github user xccui commented on a diff in the pull request:

https://github.com/apache/flink/pull/5688#discussion_r174338955
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/QueryableTableSink.scala
 ---
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sinks
+
+import java.lang.{Boolean => JBool}
+
+import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.functions.KeySelector
+import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
+import org.apache.flink.api.java.typeutils.{ResultTypeQueryable, 
RowTypeInfo}
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.datastream.DataStream
+import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.flink.table.api.StreamQueryConfig
+import 
org.apache.flink.table.runtime.aggregate.ProcessFunctionWithCleanupState
+import org.apache.flink.types.Row
+import org.apache.flink.util.Collector
+import org.slf4j.LoggerFactory
+
+class QueryableTableSink(private val namePrefix: String,
+ private val queryConfig: StreamQueryConfig)
+  extends UpsertStreamTableSink[Row]
+  with TableSinkBase[JTuple2[JBool, Row]] {
+  private var keys: Array[String] = _
+
+  /**
+* Configures the unique key fields of the [[Table]] to write.
+* The method is called after [[TableSink.configure()]].
+*
+* The keys array might be empty, if the table consists of a single 
(updated) record.
+* If the table does not have a key and is append-only, the keys 
attribute is null.
+*
+* @param keys the field names of the table's keys, an empty array if 
the table has a single
+* row, and null if the table is append-only and has no key.
+*/
+  override def setKeyFields(keys: Array[String]): Unit = {
+if (keys == null) {
+  throw new IllegalArgumentException("keys can't be null!")
+}
+this.keys = keys
+  }
+
+  /**
+* Specifies whether the [[Table]] to write is append-only or not.
+*
+* @param isAppendOnly true if the table is append-only, false 
otherwise.
+*/
+  override def setIsAppendOnly(isAppendOnly: JBool): Unit = {
+if (isAppendOnly) {
+  throw new IllegalArgumentException("A QueryableTableSink can not be 
used with append-only tables " +
--- End diff --

This line is too long (should be less than 100 characters).


> Store streaming, updating tables with unique key in queryable state
> ---
>
> Key: FLINK-6968
> URL: https://issues.apache.org/jira/browse/FLINK-6968
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API  SQL
>Reporter: Fabian Hueske
>Assignee: Renjie Liu
>Priority: Major
>
> Streaming tables with unique key are continuously updated. For example 
> queries with a non-windowed aggregation generate such tables. Commonly, such 
> updating tables are emitted via an upsert table sink to an external datastore 
> (k-v store, database) to make it accessible to applications.
> This issue is about adding a feature to store and maintain such a table as 
> queryable state in Flink. By storing the table in Flnk's queryable state, we 
> do not need an external data store to access the results of the query but can 
> query the results directly from Flink.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-6968) Store streaming, updating tables with unique key in queryable state

2018-03-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6968?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16398000#comment-16398000
 ] 

ASF GitHub Bot commented on FLINK-6968:
---

Github user xccui commented on a diff in the pull request:

https://github.com/apache/flink/pull/5688#discussion_r174338947
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/QueryableTableSink.scala
 ---
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sinks
+
+import java.lang.{Boolean => JBool}
+
+import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.functions.KeySelector
+import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
+import org.apache.flink.api.java.typeutils.{ResultTypeQueryable, 
RowTypeInfo}
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.datastream.DataStream
+import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.flink.table.api.StreamQueryConfig
+import 
org.apache.flink.table.runtime.aggregate.ProcessFunctionWithCleanupState
+import org.apache.flink.types.Row
+import org.apache.flink.util.Collector
+import org.slf4j.LoggerFactory
+
+class QueryableTableSink(private val namePrefix: String,
+ private val queryConfig: StreamQueryConfig)
+  extends UpsertStreamTableSink[Row]
+  with TableSinkBase[JTuple2[JBool, Row]] {
+  private var keys: Array[String] = _
+
+  /**
+* Configures the unique key fields of the [[Table]] to write.
+* The method is called after [[TableSink.configure()]].
+*
+* The keys array might be empty, if the table consists of a single 
(updated) record.
+* If the table does not have a key and is append-only, the keys 
attribute is null.
+*
+* @param keys the field names of the table's keys, an empty array if 
the table has a single
+* row, and null if the table is append-only and has no key.
+*/
+  override def setKeyFields(keys: Array[String]): Unit = {
+if (keys == null) {
+  throw new IllegalArgumentException("keys can't be null!")
+}
+this.keys = keys
+  }
+
+  /**
+* Specifies whether the [[Table]] to write is append-only or not.
+*
+* @param isAppendOnly true if the table is append-only, false 
otherwise.
+*/
+  override def setIsAppendOnly(isAppendOnly: JBool): Unit = {
+if (isAppendOnly) {
+  throw new IllegalArgumentException("A QueryableTableSink can not be 
used with append-only tables " +
+"as the table would grow infinitely")
+}
+  }
+
+  /** Returns the requested record type */
+  override def getRecordType: TypeInformation[Row] = new 
RowTypeInfo(getFieldTypes, getFieldNames)
+
+  /** Emits the DataStream. */
+  override def emitDataStream(dataStream: DataStream[JTuple2[JBool, 
Row]]): Unit = {
+val keyIndices = keys.map(getFieldNames.indexOf(_))
+val keyTypes = keyIndices.map(getFieldTypes(_))
+
+val keySelectorType = new RowTypeInfo(keyTypes, keys)
+
+val processFunction = new QueryableStateProcessFunction(
+  namePrefix,
+  queryConfig,
+  keys,
+  getFieldNames,
+  getFieldTypes)
+
+dataStream.keyBy(new RowKeySelector(keyIndices, keySelectorType))
+  .process(processFunction)
--- End diff --

This `process(processFunction)` method has been deprecated. Replace it with 
`process(KeyedProcessFunction)`.


> Store streaming, updating tables with unique key in queryable state
> ---
>
> Key: FLINK-6968
> URL: 

[jira] [Commented] (FLINK-6968) Store streaming, updating tables with unique key in queryable state

2018-03-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6968?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16397984#comment-16397984
 ] 

ASF GitHub Bot commented on FLINK-6968:
---

Github user liurenjie1024 commented on the issue:

https://github.com/apache/flink/pull/5688
  
@twalthr Please help to review this.


> Store streaming, updating tables with unique key in queryable state
> ---
>
> Key: FLINK-6968
> URL: https://issues.apache.org/jira/browse/FLINK-6968
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API  SQL
>Reporter: Fabian Hueske
>Assignee: Renjie Liu
>Priority: Major
>
> Streaming tables with unique key are continuously updated. For example 
> queries with a non-windowed aggregation generate such tables. Commonly, such 
> updating tables are emitted via an upsert table sink to an external datastore 
> (k-v store, database) to make it accessible to applications.
> This issue is about adding a feature to store and maintain such a table as 
> queryable state in Flink. By storing the table in Flnk's queryable state, we 
> do not need an external data store to access the results of the query but can 
> query the results directly from Flink.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-6968) Store streaming, updating tables with unique key in queryable state

2018-03-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6968?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16396822#comment-16396822
 ] 

ASF GitHub Bot commented on FLINK-6968:
---

GitHub user liurenjie1024 opened a pull request:

https://github.com/apache/flink/pull/5688

[FLINK-6968][Table API & SQL] Add Queryable table sink.

## What is the purpose of the change

Streaming tables with unique key are continuously updated. For example 
queries with a non-windowed aggregation generate such tables. Commonly, such 
updating tables are emitted via an upsert table sink to an external datastore 
(k-v store, database) to make it accessible to applications.

## Brief change log

  - *Add a QueryableStateTableSink.*
  - *States are queryable.*

## Verifying this change

This change added tests and can be verified as follows:
  - *Added test that validates that states will be stored.*
  - *Manually verified the change by running a 4 node cluser with 2 
JobManagers and 4 TaskManagers, a stateful streaming program, and use 
QueryableStateClient to test that.*


## Documentation

  - Does this pull request introduce a new feature? yes
  - If yes, how is the feature documented? docs


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/liurenjie1024/flink QueryableTableSink

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5688.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5688


commit 0663e550216dfef8bf205f90d5ac8a0e7e77a42b
Author: liurenjie1024 
Date:   2018-03-12T07:32:44Z

Code complete

commit 6d62d53f0bae65249ab69bddf7932e62ae1e7897
Author: liurenjie1024 
Date:   2018-03-13T09:44:30Z

Add test

commit c9ffa6ecdd638a497b60f3f063b2d352b1b98059
Author: liurenjie1024 
Date:   2018-03-13T10:43:19Z

Fix test style




> Store streaming, updating tables with unique key in queryable state
> ---
>
> Key: FLINK-6968
> URL: https://issues.apache.org/jira/browse/FLINK-6968
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API  SQL
>Reporter: Fabian Hueske
>Assignee: Renjie Liu
>Priority: Major
>
> Streaming tables with unique key are continuously updated. For example 
> queries with a non-windowed aggregation generate such tables. Commonly, such 
> updating tables are emitted via an upsert table sink to an external datastore 
> (k-v store, database) to make it accessible to applications.
> This issue is about adding a feature to store and maintain such a table as 
> queryable state in Flink. By storing the table in Flnk's queryable state, we 
> do not need an external data store to access the results of the query but can 
> query the results directly from Flink.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)