Hi Dawid, sorry for the late reply, I was fixing some issues for queryable state and may now have gotten to the point of your error: you may be seeing a race condition with the MemoryStateBackend state backend (the default) as described here: https://issues.apache.org/jira/browse/FLINK-5642 I'm currently working on a fix.
KvStateRequestSerializer#deserializeList(), however, is the right function to
de-serialise list state! - KvStateRequestSerializer#deserializeValue() will
not work!
Thanks for the tip regarding KvStateRequestSerializer#serializeList, this was
indeed not used since the list state backends had their own serialisation
function.
We removed KvStateRequestSerializer#serializeList as well as the queryable
list state sink for 1.2 and up.
Nico
On Monday, 16 January 2017 14:47:59 CET Dawid Wysakowicz wrote:
> Hi Nico, Ufuk,
>
> Thanks for diving into this issue.
>
> @Nico
>
> I don't think that's the problem. The code can be exactly reproduced in
> java. I am using other constructor for ListDescriptor than you did:
>
> You used:
> > public ListStateDescriptor(String name, TypeInformation<T> typeInfo)
>
> While I used:
> > public ListStateDescriptor(String name, Class<T> typeClass)
>
> I think the problem is with the way I deserialized the value on the
> QueryClient side as I tried to use:
>
>
>
> KvStateRequestSerializer.deserializeList(serializedResult, {
>
> TypeInformation.of(new TypeHint[KeyedDataPoint[lang.Integer]]() {})
>
> .createSerializer(new ExecutionConfig)
>
> })
>
> I have not checked it, but now I suspect this code would work:
> > KvStateRequestSerializer.deserializeValue(serializedResult, {
> >
> > TypeInformation.of(new
> >
> > TypeHint[util.List[KeyedDataPoint[lang.Integer]]]() {})
> >
> > .createSerializer(new ExecutionConfig)
> >
> > })
>
> Regarding removing the queryable state list I agree, using it seems
> pointless. Moreover while removing it I would take a second look at those
>
> functions:
> > KvStateRequestSerializer::deserializeList
>
> KvStateRequestSerializer.serializeList
>
>
> As I think they are not used at all even right now. Thanks for your time.
>
> Regards
> Dawid Wysakowicz
>
> 2017-01-16 13:25 GMT+01:00 Nico Kruber <n...@data-artisans.com>:
> > Hi Dawid,
> > regarding the original code, I couldn't reproduce this with the Java code
> > I
> > wrote and my guess is that the second parameter of the ListStateDescriptor
> > is
> >
> > wrong:
> > .asQueryableState(
> >
> > "type-time-series-count",
> > new ListStateDescriptor[KeyedDataPoint[java.lang.Integer]](
> >
> > "type-time-series-count",
> > classOf[KeyedDataPoint[java.lang.Integer]]))
> >
> > this should rather be
> >
> > TypeInformation.of(new TypeHint[KeyedDataPoint[lang.Integer]]() {}
> >
> > as in the query itself. It sounds strange to me that you don't get ant
> > ClassCastException or a compile-time error due to the type being wrong but
> > I
> > lack some Scala knowledge to get to the ground of this.
> >
> >
> > Regarding the removal of the queryable list state "sink", I created a JIRA
> > issue for it and will open a PR:
> > https://issues.apache.org/jira/browse/FLINK-5507
> >
> >
> > Nico
> >
> > On Saturday, 14 January 2017 14:03:41 CET Dawid Wysakowicz wrote:
> > > Hi Nico,
> > >
> > > Recently I've tried the queryable state a bit differently, by using
> > > ValueState with a value of a util.ArrayList and a ValueSerializer for
> > > util.ArrayList and it works as expected.
> > >
> > > The non-working example you can browse here:
> > > https://github.com/dawidwys/flink-intro/tree/
> >
> > c66f01117b0fe3c0adc8923000543a7
> >
> > > 0a6fe2219 The working example here:
> > > https://github.com/dawidwys/flink-intro/tree/master
> > > (The QueryableJob is in module flink-queryable-job and the QueryClient
> > > in
> > > flink-state-server)
> > >
> > > Sure, I am aware of the downfall of the ListState. I need it just for
> > > presentational purpose, but you may be right there might not be any
> > > production use for this state and it should be removed.
> > > Maybe the problem is just with the ListState and removing it would
> >
> > resolve
> >
> > > also my problem :)
> > >
> > > Regards
> > > Dawid Wysakowicz
> > >
> > > 2017-01-13 18:50 GMT+01:00 Nico Kruber <n...@data-artisans.com>:
> > > > Hi Dawid,
> > > > I'll try to reproduce the error in the next couple of days. Can you
> >
> > also
> >
> > > > share
> > > > the value deserializer you use? Also, have you tried even smaller
> >
> > examples
> >
> > > > in
> > > > the meantime? Did they work?
> > > >
> > > > As a side-note in general regarding the queryable state "sink" using
> > > > ListState
> > > > (".asQueryableState(<name>, ListStateDescriptor)"): everything that
> >
> > enters
> >
> > > > this operator will be stored forever and never cleaned. Eventually, it
> > > > will
> > > > pile up too much memory and is thus of limited use. Maybe it should
> >
> > even
> >
> > > > be
> > > > removed from the API.
> > > >
> > > >
> > > > Nico
> > > >
> > > > On Tuesday, 10 January 2017 19:43:40 CET Dawid Wysakowicz wrote:
> > > > > Hey Ufuk.
> > > > > Did you maybe had a while to have a look at that problem?
> > > > >
> > > > > 2017-01-09 10:47 GMT+01:00 Ufuk Celebi <u...@apache.org>:
> > > > > > Hey Dawid! Thanks for reporting this. I will try to have a look
> >
> > over
> >
> > > > > > the course of the day. From a first impression, this seems like a
> >
> > bug
> >
> > > > > > to me.
> > > > > >
> > > > > > On Sun, Jan 8, 2017 at 4:43 PM, Dawid Wysakowicz
> > > > > >
> > > > > > <wysakowicz.da...@gmail.com> wrote:
> > > > > > > Hi I was experimenting with the Query State feature and I have
> >
> > some
> >
> > > > > > problems
> > > > > >
> > > > > > > querying the state.
> > > > > > >
> > > > > > > The code which I use to produce the queryable state is:
> > > > > > > env.addSource(kafkaConsumer).map(
> > > > > > >
> > > > > > > e => e match {
> > > > > > >
> > > > > > > case LoginClickEvent(_, t) => ("login", 1, t)
> > > > > > > case LogoutClickEvent(_, t) => ("logout", 1, t)
> > > > > > > case ButtonClickEvent(_, _, t) => ("button", 1, t)
> > > > > > >
> > > > > > > }).keyBy(0).timeWindow(Time.seconds(1))
> > > > > > > .reduce((e1, e2) => (e1._1, e1._2 + e2._2, Math.max(e1._3,
> > > > > > > e2._3)))
> > > > > > > .map(e => new KeyedDataPoint[java.lang.Integer](e._1,
> >
> > e._3,
> >
> > > > e._2))
> > > >
> > > > > > > .keyBy("key")
> > > > > > > .asQueryableState(
> > > > > > >
> > > > > > > "type-time-series-count",
> > > > > > > new ListStateDescriptor[KeyedDataPoint[java.lang.
> >
> > Integer]](
> >
> > > > > > > "type-time-series-count",
> > > > > > > classOf[KeyedDataPoint[java.lang.Integer]]))
> > > > > > >
> > > > > > > As you see it is a rather simple job, in which I try to count
> >
> > events
> >
> > > > of
> > > >
> > > > > > > different types in windows and then query by event type.
> > > > > > >
> > > > > > > In client code I do:
> > > > > > > // Query Flink state
> > > > > > > val future = client.getKvState(jobId,
> >
> > "type-time-series-count",
> >
> > > > > > > key.hashCode, seralizedKey)
> > > > > > >
> > > > > > > // Await async result
> > > > > > > val serializedResult: Array[Byte] = Await.result(
> > > > > > >
> > > > > > > future, new FiniteDuration(
> > > > > > >
> > > > > > > 10,
> > > > > > > duration.SECONDS))
> > > > > > >
> > > > > > > // Deserialize response
> > > > > > > val results = deserializeResponse(serializedResult)
> > > > > > >
> > > > > > > results
> > > > > > >
> > > > > > > }
> >
> > > > > > > private def deserializeResponse(serializedResult:
> > Array[Byte]):
> > > > > > > util.List[KeyedDataPoint[lang
> > > > > > >
> > > > > > > .Integer]] = {
> > > > > > >
> > > > > > > KvStateRequestSerializer.deserializeList(serializedResult,
> > > > > > >
> > > > > > > getValueSerializer())
> > > > > > >
> > > > > > > }
> > > > > > >
> > > > > > > As I was trying to debug the issue I see the first element in
> >
> > list
> >
> > > > gets
> > > >
> > > > > > > deserialized correctly, but it fails on the second one. It seems
> > > > > > > like
> > > > > > > the
> > > > > > > serialized result is broken. Do you have any idea if I am doing
> >
> > sth
> >
> > > > > > wrong or
> > > > > >
> > > > > > > there is some bug?
> > > > > > >
> > > > > > >
> > > > > > > The exception I get is:
> > > > > > > java.io.EOFException: null
> > > > > > > at
> > > > > > > org.apache.flink.runtime.util.DataInputDeserializer.readFully(
> > > > > >
> > > > > > DataInputDeserializer.java:157)
> > > > > >
> > > > > > > at
> > > > > > > org.apache.flink.runtime.util.DataInputDeserializer.readUTF(
> > > > > >
> > > > > > DataInputDeserializer.java:240)
> > > > > >
> > > > > > > at
> > > > > > > org.apache.flink.api.java.typeutils.runtime.
> > > >
> > > > PojoSerializer.deserialize(
> > > >
> > > > > > PojoSerializer.java:386)
> > > > > >
> > > > > > > at
> > > > > > > org.apache.flink.runtime.query.netty.message.
> > > >
> > > > KvStateRequestSerializer.
> > > >
> > > > > > deserializeList(KvStateRequestSerializer.java:487)
> > > > > >
> > > > > > > at
> > > > > > > com.dataartisans.stateserver.queryclient.QueryClient.
> > > > > >
> > > > > > deserializeResponse(QueryClient.scala:44)
> > > > > >
> > > > > > > You can browse the exact code at: https://github.com/dawidwys/
> > > > > >
> > > > > > flink-intro
> > > > > >
> > > > > > > I would be grateful for any advice.
> > > > > > >
> > > > > > > Regards
> > > > > > > Dawid Wysakowicz
signature.asc
Description: This is a digitally signed message part.
