[GitHub] flink pull request #4225: [FLINK-7044] [queryable-st] Allow to specify names...

2017-07-31 Thread kl0u
Github user kl0u closed the pull request at:

https://github.com/apache/flink/pull/4225


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4225: [FLINK-7044] [queryable-st] Allow to specify names...

2017-06-30 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/4225#discussion_r125039522
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/query/AbstractQueryableStateITCase.java
 ---
@@ -534,33 +537,66 @@ public Integer getKey(Tuple2 value) 
throws Exception {
 * Retry a query for state for keys between 0 and {@link #NUM_SLOTS} 
until
 * expected equals the value of the result tuple's second 
field.
 */
-   private void executeValueQuery(final Deadline deadline,
-   final QueryableStateClient client, final JobID jobId,
-   final QueryableStateStream> 
queryableState,
-   final long expected) throws Exception {
+   private void executeValueQuery(
--- End diff --

👍 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4225: [FLINK-7044] [queryable-st] Allow to specify names...

2017-06-30 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/4225#discussion_r125039482
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/VoidNamespaceTypeInfo.java
 ---
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.annotation.Public;
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+
+/**
+ * {@link TypeInformation} for {@link VoidNamespace}.
+ */
+@Public
+public class VoidNamespaceTypeInfo extends TypeInformation {
+
+   private static final long serialVersionUID = 5453679706408610586L;
+
+   public static final VoidNamespaceTypeInfo INSTANCE = new 
VoidNamespaceTypeInfo();
+
+   @Override
+   @PublicEvolving
--- End diff --

sounds good!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4225: [FLINK-7044] [queryable-st] Allow to specify names...

2017-06-30 Thread kl0u
Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/4225#discussion_r125007394
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/query/AbstractQueryableStateITCase.java
 ---
@@ -534,33 +537,66 @@ public Integer getKey(Tuple2 value) 
throws Exception {
 * Retry a query for state for keys between 0 and {@link #NUM_SLOTS} 
until
 * expected equals the value of the result tuple's second 
field.
 */
-   private void executeValueQuery(final Deadline deadline,
-   final QueryableStateClient client, final JobID jobId,
-   final QueryableStateStream> 
queryableState,
-   final long expected) throws Exception {
+   private void executeValueQuery(
--- End diff --

Yes you are right! Value here is used in the sense of "asking the value for 
a key" not in the sense of `ValueState` which is a scalar. I will change this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4225: [FLINK-7044] [queryable-st] Allow to specify names...

2017-06-30 Thread kl0u
Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/4225#discussion_r125007122
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/query/AbstractQueryableStateITCase.java
 ---
@@ -234,38 +239,28 @@ public Integer getKey(Tuple2 value) 
throws Exception {
allNonZero = false;
}
 
-   final byte[] serializedKey = 
KvStateRequestSerializer.serializeKeyAndNamespace(
-   key,
-   
queryableState.getKeySerializer(),
-   VoidNamespace.INSTANCE,
-   
VoidNamespaceSerializer.INSTANCE);
-
-   Future serializedResult = 
getKvStateWithRetries(
+   Future> 
serializedResult = getKvStateWithRetries(
--- End diff --

done!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4225: [FLINK-7044] [queryable-st] Allow to specify names...

2017-06-30 Thread kl0u
Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/4225#discussion_r125006493
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/query/QueryableStateClient.java
 ---
@@ -267,6 +293,177 @@ public void shutDown() {
}
 
/**
+* Returns a future holding the request result.
+*
+* If the server does not serve a KvState instance with the given ID,
+* the Future will be failed with a {@link UnknownKvStateID}.
+*
+* If the KvState instance does not hold any data for the given key
+* and namespace, the Future will be failed with a {@link 
UnknownKeyOrNamespace}.
+*
+* All other failures are forwarded to the Future.
+*
+* @param jobId JobID of the job the queryable 
state belongs to.
+* @param queryableStateNameName under which the state is 
queryable.
+* @param key   The key we are interested 
in.
+* @param keyTypeHint   A {@link TypeHint} used 
to extract the type of the key.
+* @param stateDescriptor   The {@link 
StateDescriptor} of the state we want to query.
+* @return Future holding the result.
+*/
+   @PublicEvolving
+   public  Future getKvState(
+   final JobID jobId,
+   final String queryableStateName,
+   final K key,
+   final TypeHint keyTypeHint,
+   final StateDescriptor stateDescriptor) {
+
+   Preconditions.checkNotNull(keyTypeHint);
+
+   TypeInformation keyTypeInfo = keyTypeHint.getTypeInfo();
+   return getKvState(jobId, queryableStateName, key, keyTypeInfo, 
stateDescriptor);
+   }
+
+   /**
+* Returns a future holding the request result.
+*
+* If the server does not serve a KvState instance with the given ID,
+* the Future will be failed with a {@link UnknownKvStateID}.
+*
+* If the KvState instance does not hold any data for the given key
+* and namespace, the Future will be failed with a {@link 
UnknownKeyOrNamespace}.
+*
+* All other failures are forwarded to the Future.
+*
+* @param jobId JobID of the job the queryable 
state belongs to.
+* @param queryableStateNameName under which the state is 
queryable.
+* @param key   The key we are interested 
in.
+* @param keyTypeInfo   The {@link 
TypeInformation} of the key.
+* @param stateDescriptor   The {@link 
StateDescriptor} of the state we want to query.
+* @return Future holding the result.
+*/
+   @PublicEvolving
+   public  Future getKvState(
+   final JobID jobId,
+   final String queryableStateName,
+   final K key,
+   final TypeInformation keyTypeInfo,
+   final StateDescriptor stateDescriptor) {
+
+   Preconditions.checkNotNull(keyTypeInfo);
+
+   return getKvState(jobId, queryableStateName, key, 
VoidNamespace.INSTANCE,
+   keyTypeInfo, VoidNamespaceTypeInfo.INSTANCE, 
stateDescriptor);
+   }
+
+   /**
+* Returns a future holding the request result.
+*
+* If the server does not serve a KvState instance with the given ID,
+* the Future will be failed with a {@link UnknownKvStateID}.
+*
+* If the KvState instance does not hold any data for the given key
+* and namespace, the Future will be failed with a {@link 
UnknownKeyOrNamespace}.
+*
+* All other failures are forwarded to the Future.
+*
+* @param jobId JobID of the job the queryable 
state belongs to.
+* @param queryableStateNameName under which the state is 
queryable.
+* @param key   The key that the state we 
request is associated with.
+* @param namespace The namespace 
of the state.
+* @param keyTypeInfo   The {@link 
TypeInformation} of the keys.
+* @param namespaceTypeInfo The {@link 
TypeInformation} of the namespace.
+* @param stateDescriptor   The {@link 
StateDescriptor} of the state we want to query.
+* @return Future holding the result.
+*/
+   @PublicEvolving
+   public  Future getKvState(
+   final JobID jobId,
+   final String queryableStateName,
+   final K key,
+ 

[GitHub] flink pull request #4225: [FLINK-7044] [queryable-st] Allow to specify names...

2017-06-30 Thread kl0u
Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/4225#discussion_r125006417
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/VoidNamespaceTypeInfo.java
 ---
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.annotation.Public;
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+
+/**
+ * {@link TypeInformation} for {@link VoidNamespace}.
+ */
+@Public
+public class VoidNamespaceTypeInfo extends TypeInformation {
+
+   private static final long serialVersionUID = 5453679706408610586L;
+
+   public static final VoidNamespaceTypeInfo INSTANCE = new 
VoidNamespaceTypeInfo();
+
+   @Override
+   @PublicEvolving
--- End diff --

I also had it as you describe but changed it because I checked the other 
classes. 
I think the intention is to tell the user that the `TypeInformation` will 
always be there (thus `@Public`) but the implementation may change 
(`@PublicEvolving`). 

For uniformity, I would suggest to keep it as the others and probably 
create another issue that changes all the related annotations. I think this 
will lead to a more consistent approach and whoever set these annotations in 
the first place will have the opportunity to say if this change seems ok. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4225: [FLINK-7044] [queryable-st] Allow to specify names...

2017-06-30 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/4225#discussion_r124999382
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/query/AbstractQueryableStateITCase.java
 ---
@@ -234,38 +239,28 @@ public Integer getKey(Tuple2 value) 
throws Exception {
allNonZero = false;
}
 
-   final byte[] serializedKey = 
KvStateRequestSerializer.serializeKeyAndNamespace(
-   key,
-   
queryableState.getKeySerializer(),
-   VoidNamespace.INSTANCE,
-   
VoidNamespaceSerializer.INSTANCE);
-
-   Future serializedResult = 
getKvStateWithRetries(
+   Future> 
serializedResult = getKvStateWithRetries(
--- End diff --

Should probably not be called `serializedResult`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4225: [FLINK-7044] [queryable-st] Allow to specify names...

2017-06-30 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/4225#discussion_r124998957
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/VoidNamespaceTypeInfo.java
 ---
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.annotation.Public;
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+
+/**
+ * {@link TypeInformation} for {@link VoidNamespace}.
+ */
+@Public
+public class VoidNamespaceTypeInfo extends TypeInformation {
+
+   private static final long serialVersionUID = 5453679706408610586L;
+
+   public static final VoidNamespaceTypeInfo INSTANCE = new 
VoidNamespaceTypeInfo();
+
+   @Override
+   @PublicEvolving
--- End diff --

I don't think we need these `@PublicEvolving` annotations here. I know that 
they are on all the `TypeInformation` subclasses but I also don't think they're 
required there. Only on the interface/base class should they be needed, IMHO. 
What do you think?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4225: [FLINK-7044] [queryable-st] Allow to specify names...

2017-06-30 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/4225#discussion_r124999617
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/query/AbstractQueryableStateITCase.java
 ---
@@ -534,33 +537,66 @@ public Integer getKey(Tuple2 value) 
throws Exception {
 * Retry a query for state for keys between 0 and {@link #NUM_SLOTS} 
until
 * expected equals the value of the result tuple's second 
field.
 */
-   private void executeValueQuery(final Deadline deadline,
-   final QueryableStateClient client, final JobID jobId,
-   final QueryableStateStream> 
queryableState,
-   final long expected) throws Exception {
+   private void executeValueQuery(
--- End diff --

There's nothing in the method signature that restricts this to 
`ValueState`. I think this can just be `executeQuery()`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4225: [FLINK-7044] [queryable-st] Allow to specify names...

2017-06-30 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/4225#discussion_r124998338
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/query/QueryableStateClient.java
 ---
@@ -157,10 +168,16 @@ public QueryableStateClient(
this.lookupService = lookupService;
this.kvStateClient = networkClient;
this.executionContext = actorSystem.dispatcher();
+   this.executionConfig = new ExecutionConfig();
 
this.lookupService.start();
}
 
+   /** Gets the {@link ExecutionConfig} object. */
--- End diff --

I think we should also allow a setter, so that users can set the 
`ExecutionConfig` from a `DataStream`. Also, to be in line with `DataStream` we 
should call this `getExecutionConfig()`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4225: [FLINK-7044] [queryable-st] Allow to specify names...

2017-06-30 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/4225#discussion_r125000171
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/query/QueryableStateClient.java
 ---
@@ -267,6 +293,177 @@ public void shutDown() {
}
 
/**
+* Returns a future holding the request result.
+*
+* If the server does not serve a KvState instance with the given ID,
+* the Future will be failed with a {@link UnknownKvStateID}.
+*
+* If the KvState instance does not hold any data for the given key
+* and namespace, the Future will be failed with a {@link 
UnknownKeyOrNamespace}.
+*
+* All other failures are forwarded to the Future.
+*
+* @param jobId JobID of the job the queryable 
state belongs to.
+* @param queryableStateNameName under which the state is 
queryable.
+* @param key   The key we are interested 
in.
+* @param keyTypeHint   A {@link TypeHint} used 
to extract the type of the key.
+* @param stateDescriptor   The {@link 
StateDescriptor} of the state we want to query.
+* @return Future holding the result.
+*/
+   @PublicEvolving
+   public  Future getKvState(
+   final JobID jobId,
+   final String queryableStateName,
+   final K key,
+   final TypeHint keyTypeHint,
+   final StateDescriptor stateDescriptor) {
+
+   Preconditions.checkNotNull(keyTypeHint);
+
+   TypeInformation keyTypeInfo = keyTypeHint.getTypeInfo();
+   return getKvState(jobId, queryableStateName, key, keyTypeInfo, 
stateDescriptor);
+   }
+
+   /**
+* Returns a future holding the request result.
+*
+* If the server does not serve a KvState instance with the given ID,
+* the Future will be failed with a {@link UnknownKvStateID}.
+*
+* If the KvState instance does not hold any data for the given key
+* and namespace, the Future will be failed with a {@link 
UnknownKeyOrNamespace}.
+*
+* All other failures are forwarded to the Future.
+*
+* @param jobId JobID of the job the queryable 
state belongs to.
+* @param queryableStateNameName under which the state is 
queryable.
+* @param key   The key we are interested 
in.
+* @param keyTypeInfo   The {@link 
TypeInformation} of the key.
+* @param stateDescriptor   The {@link 
StateDescriptor} of the state we want to query.
+* @return Future holding the result.
+*/
+   @PublicEvolving
+   public  Future getKvState(
+   final JobID jobId,
+   final String queryableStateName,
+   final K key,
+   final TypeInformation keyTypeInfo,
+   final StateDescriptor stateDescriptor) {
+
+   Preconditions.checkNotNull(keyTypeInfo);
+
+   return getKvState(jobId, queryableStateName, key, 
VoidNamespace.INSTANCE,
+   keyTypeInfo, VoidNamespaceTypeInfo.INSTANCE, 
stateDescriptor);
+   }
+
+   /**
+* Returns a future holding the request result.
+*
+* If the server does not serve a KvState instance with the given ID,
+* the Future will be failed with a {@link UnknownKvStateID}.
+*
+* If the KvState instance does not hold any data for the given key
+* and namespace, the Future will be failed with a {@link 
UnknownKeyOrNamespace}.
+*
+* All other failures are forwarded to the Future.
+*
+* @param jobId JobID of the job the queryable 
state belongs to.
+* @param queryableStateNameName under which the state is 
queryable.
+* @param key   The key that the state we 
request is associated with.
+* @param namespace The namespace 
of the state.
+* @param keyTypeInfo   The {@link 
TypeInformation} of the keys.
+* @param namespaceTypeInfo The {@link 
TypeInformation} of the namespace.
+* @param stateDescriptor   The {@link 
StateDescriptor} of the state we want to query.
+* @return Future holding the result.
+*/
+   @PublicEvolving
+   public  Future getKvState(
+   final JobID jobId,
+   final String queryableStateName,
+   final K key,
+ 

[GitHub] flink pull request #4225: [FLINK-7044] [queryable-st] Allow to specify names...

2017-06-29 Thread kl0u
GitHub user kl0u opened a pull request:

https://github.com/apache/flink/pull/4225

[FLINK-7044] [queryable-st] Allow to specify namespace and descriptor in 
the query.

R @aljoscha 

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/kl0u/flink qs-api

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4225.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4225


commit 4402fad5b144e5c7fb52fd475edbfa2bcc4c1c67
Author: kkloudas 
Date:   2017-06-19T13:01:40Z

[FLINK-7044] [queryable-st] Allow to specify namespace and descriptor in 
query.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---