Hi Dawid,
sorry for the late reply, I was fixing some issues for queryable state and may 
now have gotten to the point of your error: you may be seeing a race condition 
with the MemoryStateBackend state backend (the default) as described here:
https://issues.apache.org/jira/browse/FLINK-5642
I'm currently working on a fix.

KvStateRequestSerializer#deserializeList(), however, is the right function to 
de-serialise list state! - KvStateRequestSerializer#deserializeValue() will 
not work!

Thanks for the tip regarding KvStateRequestSerializer#serializeList, this was 
indeed not used since the list state backends had their own serialisation 
function.
We removed KvStateRequestSerializer#serializeList as well as the queryable 
list state sink for 1.2 and up.


Nico

On Monday, 16 January 2017 14:47:59 CET Dawid Wysakowicz wrote:
> Hi Nico, Ufuk,
> 
> Thanks for diving into this issue.
> 
> @Nico
> 
> I don't think that's the problem. The code can be exactly reproduced in
> java. I am using other constructor for ListDescriptor than you did:
> 
> You used:
> > public ListStateDescriptor(String name, TypeInformation<T> typeInfo)
> 
> While I used:
> >  public ListStateDescriptor(String name, Class<T> typeClass)
> 
> I think the problem is with the way I deserialized the value on the
> QueryClient side as I tried to use:
> 
> 
> 
> KvStateRequestSerializer.deserializeList(serializedResult, {
> 
>       TypeInformation.of(new TypeHint[KeyedDataPoint[lang.Integer]]() {})
> 
>         .createSerializer(new ExecutionConfig)
> 
>     })
> 
> I have not checked it, but now I suspect this code would work:
> > KvStateRequestSerializer.deserializeValue(serializedResult, {
> > 
> >       TypeInformation.of(new
> > 
> > TypeHint[util.List[KeyedDataPoint[lang.Integer]]]() {})
> > 
> >         .createSerializer(new ExecutionConfig)
> >     
> >     })
> 
> Regarding removing the queryable state list I agree, using it seems
> pointless. Moreover while removing it I would take a second look at those
> 
> functions:
> > KvStateRequestSerializer::deserializeList
> 
>  KvStateRequestSerializer.serializeList
> 
> 
> As I think they are not used at all even right now. Thanks for your time.
> 
> Regards
> Dawid Wysakowicz
> 
> 2017-01-16 13:25 GMT+01:00 Nico Kruber <n...@data-artisans.com>:
> > Hi Dawid,
> > regarding the original code, I couldn't reproduce this with the Java code
> > I
> > wrote and my guess is that the second parameter of the ListStateDescriptor
> > is
> > 
> > wrong:
> >       .asQueryableState(
> >       
> >         "type-time-series-count",
> >         new ListStateDescriptor[KeyedDataPoint[java.lang.Integer]](
> >         
> >           "type-time-series-count",
> >           classOf[KeyedDataPoint[java.lang.Integer]]))
> > 
> > this should rather be
> > 
> > TypeInformation.of(new TypeHint[KeyedDataPoint[lang.Integer]]() {}
> > 
> > as in the query itself. It sounds strange to me that you don't get ant
> > ClassCastException or a compile-time error due to the type being wrong but
> > I
> > lack some Scala knowledge to get to the ground of this.
> > 
> > 
> > Regarding the removal of the queryable list state "sink", I created a JIRA
> > issue for it and will open a PR:
> > https://issues.apache.org/jira/browse/FLINK-5507
> > 
> > 
> > Nico
> > 
> > On Saturday, 14 January 2017 14:03:41 CET Dawid Wysakowicz wrote:
> > > Hi Nico,
> > > 
> > > Recently I've tried the queryable state a bit differently, by using
> > > ValueState with a value of a util.ArrayList and a ValueSerializer for
> > > util.ArrayList and it works as expected.
> > > 
> > > The non-working example you can browse here:
> > > https://github.com/dawidwys/flink-intro/tree/
> > 
> > c66f01117b0fe3c0adc8923000543a7
> > 
> > > 0a6fe2219 The working example here:
> > > https://github.com/dawidwys/flink-intro/tree/master
> > > (The QueryableJob is in module flink-queryable-job and the QueryClient
> > > in
> > > flink-state-server)
> > > 
> > > Sure, I am aware of the downfall of the ListState. I need it just for
> > > presentational purpose, but you may be right there might not be any
> > > production use for this state and it should be removed.
> > > Maybe the problem is just with the ListState and removing it would
> > 
> > resolve
> > 
> > > also my problem :)
> > > 
> > > Regards
> > > Dawid Wysakowicz
> > > 
> > > 2017-01-13 18:50 GMT+01:00 Nico Kruber <n...@data-artisans.com>:
> > > > Hi Dawid,
> > > > I'll try to reproduce the error in the next couple of days. Can you
> > 
> > also
> > 
> > > > share
> > > > the value deserializer you use? Also, have you tried even smaller
> > 
> > examples
> > 
> > > > in
> > > > the meantime? Did they work?
> > > > 
> > > > As a side-note in general regarding the queryable state "sink" using
> > > > ListState
> > > > (".asQueryableState(<name>, ListStateDescriptor)"): everything that
> > 
> > enters
> > 
> > > > this operator will be stored forever and never cleaned. Eventually, it
> > > > will
> > > > pile up too much memory and is thus of limited use. Maybe it should
> > 
> > even
> > 
> > > > be
> > > > removed from the API.
> > > > 
> > > > 
> > > > Nico
> > > > 
> > > > On Tuesday, 10 January 2017 19:43:40 CET Dawid Wysakowicz wrote:
> > > > > Hey Ufuk.
> > > > > Did you maybe had a while to have a look at that problem?
> > > > > 
> > > > > 2017-01-09 10:47 GMT+01:00 Ufuk Celebi <u...@apache.org>:
> > > > > > Hey Dawid! Thanks for reporting this. I will try to have a look
> > 
> > over
> > 
> > > > > > the course of the day. From a first impression, this seems like a
> > 
> > bug
> > 
> > > > > > to me.
> > > > > > 
> > > > > > On Sun, Jan 8, 2017 at 4:43 PM, Dawid Wysakowicz
> > > > > > 
> > > > > > <wysakowicz.da...@gmail.com> wrote:
> > > > > > > Hi I was experimenting with the Query State feature and I have
> > 
> > some
> > 
> > > > > > problems
> > > > > > 
> > > > > > > querying the state.
> > > > > > > 
> > > > > > > The code which I use to produce the queryable state is:
> > > > > > >     env.addSource(kafkaConsumer).map(
> > > > > > >     
> > > > > > >       e => e match {
> > > > > > >       
> > > > > > >         case LoginClickEvent(_, t) => ("login", 1, t)
> > > > > > >         case LogoutClickEvent(_, t) => ("logout", 1, t)
> > > > > > >         case ButtonClickEvent(_, _, t) => ("button", 1, t)
> > > > > > >       
> > > > > > >       }).keyBy(0).timeWindow(Time.seconds(1))
> > > > > > >       .reduce((e1, e2) => (e1._1, e1._2 + e2._2, Math.max(e1._3,
> > > > > > >       e2._3)))
> > > > > > >       .map(e => new KeyedDataPoint[java.lang.Integer](e._1,
> > 
> > e._3,
> > 
> > > > e._2))
> > > > 
> > > > > > >       .keyBy("key")
> > > > > > >       .asQueryableState(
> > > > > > >       
> > > > > > >         "type-time-series-count",
> > > > > > >         new ListStateDescriptor[KeyedDataPoint[java.lang.
> > 
> > Integer]](
> > 
> > > > > > >           "type-time-series-count",
> > > > > > >           classOf[KeyedDataPoint[java.lang.Integer]]))
> > > > > > > 
> > > > > > > As you see it is a rather simple job, in which I try to count
> > 
> > events
> > 
> > > > of
> > > > 
> > > > > > > different types in windows and then query by event type.
> > > > > > > 
> > > > > > > In client code I do:
> > > > > > >     // Query Flink state
> > > > > > >     val future = client.getKvState(jobId,
> > 
> > "type-time-series-count",
> > 
> > > > > > > key.hashCode, seralizedKey)
> > > > > > > 
> > > > > > >     // Await async result
> > > > > > >     val serializedResult: Array[Byte] = Await.result(
> > > > > > >     
> > > > > > >       future, new FiniteDuration(
> > > > > > >       
> > > > > > >         10,
> > > > > > >         duration.SECONDS))
> > > > > > >     
> > > > > > >     // Deserialize response
> > > > > > >     val results = deserializeResponse(serializedResult)
> > > > > > >     
> > > > > > >     results
> > > > > > >   
> > > > > > >   }
> > 
> > > > > > >   private def deserializeResponse(serializedResult:
> > Array[Byte]):
> > > > > > > util.List[KeyedDataPoint[lang
> > > > > > > 
> > > > > > >   .Integer]] = {
> > > > > > >   
> > > > > > >     KvStateRequestSerializer.deserializeList(serializedResult,
> > > > > > > 
> > > > > > > getValueSerializer())
> > > > > > > 
> > > > > > >   }
> > > > > > > 
> > > > > > > As I was trying to debug the issue I see the first element in
> > 
> > list
> > 
> > > > gets
> > > > 
> > > > > > > deserialized correctly, but it fails on the second one. It seems
> > > > > > > like
> > > > > > > the
> > > > > > > serialized result is broken. Do you have any idea if I am doing
> > 
> > sth
> > 
> > > > > > wrong or
> > > > > > 
> > > > > > > there is some bug?
> > > > > > > 
> > > > > > > 
> > > > > > > The exception I get is:
> > > > > > > java.io.EOFException: null
> > > > > > > at
> > > > > > > org.apache.flink.runtime.util.DataInputDeserializer.readFully(
> > > > > > 
> > > > > > DataInputDeserializer.java:157)
> > > > > > 
> > > > > > > at
> > > > > > > org.apache.flink.runtime.util.DataInputDeserializer.readUTF(
> > > > > > 
> > > > > > DataInputDeserializer.java:240)
> > > > > > 
> > > > > > > at
> > > > > > > org.apache.flink.api.java.typeutils.runtime.
> > > > 
> > > > PojoSerializer.deserialize(
> > > > 
> > > > > > PojoSerializer.java:386)
> > > > > > 
> > > > > > > at
> > > > > > > org.apache.flink.runtime.query.netty.message.
> > > > 
> > > > KvStateRequestSerializer.
> > > > 
> > > > > > deserializeList(KvStateRequestSerializer.java:487)
> > > > > > 
> > > > > > > at
> > > > > > > com.dataartisans.stateserver.queryclient.QueryClient.
> > > > > > 
> > > > > > deserializeResponse(QueryClient.scala:44)
> > > > > > 
> > > > > > > You can browse the exact code at: https://github.com/dawidwys/
> > > > > > 
> > > > > > flink-intro
> > > > > > 
> > > > > > > I would be grateful for any advice.
> > > > > > > 
> > > > > > > Regards
> > > > > > > Dawid Wysakowicz

Attachment: signature.asc
Description: This is a digitally signed message part.

Reply via email to