Hi Dawid, sorry for the late reply, I was fixing some issues for queryable state and may now have gotten to the point of your error: you may be seeing a race condition with the MemoryStateBackend state backend (the default) as described here: https://issues.apache.org/jira/browse/FLINK-5642 I'm currently working on a fix.
KvStateRequestSerializer#deserializeList(), however, is the right function to de-serialise list state! - KvStateRequestSerializer#deserializeValue() will not work! Thanks for the tip regarding KvStateRequestSerializer#serializeList, this was indeed not used since the list state backends had their own serialisation function. We removed KvStateRequestSerializer#serializeList as well as the queryable list state sink for 1.2 and up. Nico On Monday, 16 January 2017 14:47:59 CET Dawid Wysakowicz wrote: > Hi Nico, Ufuk, > > Thanks for diving into this issue. > > @Nico > > I don't think that's the problem. The code can be exactly reproduced in > java. I am using other constructor for ListDescriptor than you did: > > You used: > > public ListStateDescriptor(String name, TypeInformation<T> typeInfo) > > While I used: > > public ListStateDescriptor(String name, Class<T> typeClass) > > I think the problem is with the way I deserialized the value on the > QueryClient side as I tried to use: > > > > KvStateRequestSerializer.deserializeList(serializedResult, { > > TypeInformation.of(new TypeHint[KeyedDataPoint[lang.Integer]]() {}) > > .createSerializer(new ExecutionConfig) > > }) > > I have not checked it, but now I suspect this code would work: > > KvStateRequestSerializer.deserializeValue(serializedResult, { > > > > TypeInformation.of(new > > > > TypeHint[util.List[KeyedDataPoint[lang.Integer]]]() {}) > > > > .createSerializer(new ExecutionConfig) > > > > }) > > Regarding removing the queryable state list I agree, using it seems > pointless. Moreover while removing it I would take a second look at those > > functions: > > KvStateRequestSerializer::deserializeList > > KvStateRequestSerializer.serializeList > > > As I think they are not used at all even right now. Thanks for your time. > > Regards > Dawid Wysakowicz > > 2017-01-16 13:25 GMT+01:00 Nico Kruber <n...@data-artisans.com>: > > Hi Dawid, > > regarding the original code, I couldn't reproduce this with the Java code > > I > > wrote and my guess is that the second parameter of the ListStateDescriptor > > is > > > > wrong: > > .asQueryableState( > > > > "type-time-series-count", > > new ListStateDescriptor[KeyedDataPoint[java.lang.Integer]]( > > > > "type-time-series-count", > > classOf[KeyedDataPoint[java.lang.Integer]])) > > > > this should rather be > > > > TypeInformation.of(new TypeHint[KeyedDataPoint[lang.Integer]]() {} > > > > as in the query itself. It sounds strange to me that you don't get ant > > ClassCastException or a compile-time error due to the type being wrong but > > I > > lack some Scala knowledge to get to the ground of this. > > > > > > Regarding the removal of the queryable list state "sink", I created a JIRA > > issue for it and will open a PR: > > https://issues.apache.org/jira/browse/FLINK-5507 > > > > > > Nico > > > > On Saturday, 14 January 2017 14:03:41 CET Dawid Wysakowicz wrote: > > > Hi Nico, > > > > > > Recently I've tried the queryable state a bit differently, by using > > > ValueState with a value of a util.ArrayList and a ValueSerializer for > > > util.ArrayList and it works as expected. > > > > > > The non-working example you can browse here: > > > https://github.com/dawidwys/flink-intro/tree/ > > > > c66f01117b0fe3c0adc8923000543a7 > > > > > 0a6fe2219 The working example here: > > > https://github.com/dawidwys/flink-intro/tree/master > > > (The QueryableJob is in module flink-queryable-job and the QueryClient > > > in > > > flink-state-server) > > > > > > Sure, I am aware of the downfall of the ListState. I need it just for > > > presentational purpose, but you may be right there might not be any > > > production use for this state and it should be removed. > > > Maybe the problem is just with the ListState and removing it would > > > > resolve > > > > > also my problem :) > > > > > > Regards > > > Dawid Wysakowicz > > > > > > 2017-01-13 18:50 GMT+01:00 Nico Kruber <n...@data-artisans.com>: > > > > Hi Dawid, > > > > I'll try to reproduce the error in the next couple of days. Can you > > > > also > > > > > > share > > > > the value deserializer you use? Also, have you tried even smaller > > > > examples > > > > > > in > > > > the meantime? Did they work? > > > > > > > > As a side-note in general regarding the queryable state "sink" using > > > > ListState > > > > (".asQueryableState(<name>, ListStateDescriptor)"): everything that > > > > enters > > > > > > this operator will be stored forever and never cleaned. Eventually, it > > > > will > > > > pile up too much memory and is thus of limited use. Maybe it should > > > > even > > > > > > be > > > > removed from the API. > > > > > > > > > > > > Nico > > > > > > > > On Tuesday, 10 January 2017 19:43:40 CET Dawid Wysakowicz wrote: > > > > > Hey Ufuk. > > > > > Did you maybe had a while to have a look at that problem? > > > > > > > > > > 2017-01-09 10:47 GMT+01:00 Ufuk Celebi <u...@apache.org>: > > > > > > Hey Dawid! Thanks for reporting this. I will try to have a look > > > > over > > > > > > > > the course of the day. From a first impression, this seems like a > > > > bug > > > > > > > > to me. > > > > > > > > > > > > On Sun, Jan 8, 2017 at 4:43 PM, Dawid Wysakowicz > > > > > > > > > > > > <wysakowicz.da...@gmail.com> wrote: > > > > > > > Hi I was experimenting with the Query State feature and I have > > > > some > > > > > > > > problems > > > > > > > > > > > > > querying the state. > > > > > > > > > > > > > > The code which I use to produce the queryable state is: > > > > > > > env.addSource(kafkaConsumer).map( > > > > > > > > > > > > > > e => e match { > > > > > > > > > > > > > > case LoginClickEvent(_, t) => ("login", 1, t) > > > > > > > case LogoutClickEvent(_, t) => ("logout", 1, t) > > > > > > > case ButtonClickEvent(_, _, t) => ("button", 1, t) > > > > > > > > > > > > > > }).keyBy(0).timeWindow(Time.seconds(1)) > > > > > > > .reduce((e1, e2) => (e1._1, e1._2 + e2._2, Math.max(e1._3, > > > > > > > e2._3))) > > > > > > > .map(e => new KeyedDataPoint[java.lang.Integer](e._1, > > > > e._3, > > > > > > e._2)) > > > > > > > > > > > .keyBy("key") > > > > > > > .asQueryableState( > > > > > > > > > > > > > > "type-time-series-count", > > > > > > > new ListStateDescriptor[KeyedDataPoint[java.lang. > > > > Integer]]( > > > > > > > > > "type-time-series-count", > > > > > > > classOf[KeyedDataPoint[java.lang.Integer]])) > > > > > > > > > > > > > > As you see it is a rather simple job, in which I try to count > > > > events > > > > > > of > > > > > > > > > > > different types in windows and then query by event type. > > > > > > > > > > > > > > In client code I do: > > > > > > > // Query Flink state > > > > > > > val future = client.getKvState(jobId, > > > > "type-time-series-count", > > > > > > > > > key.hashCode, seralizedKey) > > > > > > > > > > > > > > // Await async result > > > > > > > val serializedResult: Array[Byte] = Await.result( > > > > > > > > > > > > > > future, new FiniteDuration( > > > > > > > > > > > > > > 10, > > > > > > > duration.SECONDS)) > > > > > > > > > > > > > > // Deserialize response > > > > > > > val results = deserializeResponse(serializedResult) > > > > > > > > > > > > > > results > > > > > > > > > > > > > > } > > > > > > > > > private def deserializeResponse(serializedResult: > > Array[Byte]): > > > > > > > util.List[KeyedDataPoint[lang > > > > > > > > > > > > > > .Integer]] = { > > > > > > > > > > > > > > KvStateRequestSerializer.deserializeList(serializedResult, > > > > > > > > > > > > > > getValueSerializer()) > > > > > > > > > > > > > > } > > > > > > > > > > > > > > As I was trying to debug the issue I see the first element in > > > > list > > > > > > gets > > > > > > > > > > > deserialized correctly, but it fails on the second one. It seems > > > > > > > like > > > > > > > the > > > > > > > serialized result is broken. Do you have any idea if I am doing > > > > sth > > > > > > > > wrong or > > > > > > > > > > > > > there is some bug? > > > > > > > > > > > > > > > > > > > > > The exception I get is: > > > > > > > java.io.EOFException: null > > > > > > > at > > > > > > > org.apache.flink.runtime.util.DataInputDeserializer.readFully( > > > > > > > > > > > > DataInputDeserializer.java:157) > > > > > > > > > > > > > at > > > > > > > org.apache.flink.runtime.util.DataInputDeserializer.readUTF( > > > > > > > > > > > > DataInputDeserializer.java:240) > > > > > > > > > > > > > at > > > > > > > org.apache.flink.api.java.typeutils.runtime. > > > > > > > > PojoSerializer.deserialize( > > > > > > > > > > PojoSerializer.java:386) > > > > > > > > > > > > > at > > > > > > > org.apache.flink.runtime.query.netty.message. > > > > > > > > KvStateRequestSerializer. > > > > > > > > > > deserializeList(KvStateRequestSerializer.java:487) > > > > > > > > > > > > > at > > > > > > > com.dataartisans.stateserver.queryclient.QueryClient. > > > > > > > > > > > > deserializeResponse(QueryClient.scala:44) > > > > > > > > > > > > > You can browse the exact code at: https://github.com/dawidwys/ > > > > > > > > > > > > flink-intro > > > > > > > > > > > > > I would be grateful for any advice. > > > > > > > > > > > > > > Regards > > > > > > > Dawid Wysakowicz
signature.asc
Description: This is a digitally signed message part.