Hey Ufuk.
Did you maybe had a while to have a look at that problem?
2017-01-09 10:47 GMT+01:00 Ufuk Celebi <u...@apache.org>:
> Hey Dawid! Thanks for reporting this. I will try to have a look over
> the course of the day. From a first impression, this seems like a bug
> to me.
>
> On Sun, Jan 8, 2017 at 4:43 PM, Dawid Wysakowicz
> <wysakowicz.da...@gmail.com> wrote:
> > Hi I was experimenting with the Query State feature and I have some
> problems
> > querying the state.
> >
> > The code which I use to produce the queryable state is:
> >
> > env.addSource(kafkaConsumer).map(
> > e => e match {
> > case LoginClickEvent(_, t) => ("login", 1, t)
> > case LogoutClickEvent(_, t) => ("logout", 1, t)
> > case ButtonClickEvent(_, _, t) => ("button", 1, t)
> > }).keyBy(0).timeWindow(Time.seconds(1))
> > .reduce((e1, e2) => (e1._1, e1._2 + e2._2, Math.max(e1._3, e2._3)))
> > .map(e => new KeyedDataPoint[java.lang.Integer](e._1, e._3, e._2))
> > .keyBy("key")
> > .asQueryableState(
> > "type-time-series-count",
> > new ListStateDescriptor[KeyedDataPoint[java.lang.Integer]](
> > "type-time-series-count",
> > classOf[KeyedDataPoint[java.lang.Integer]]))
> >
> > As you see it is a rather simple job, in which I try to count events of
> > different types in windows and then query by event type.
> >
> > In client code I do:
> > // Query Flink state
> > val future = client.getKvState(jobId, "type-time-series-count",
> > key.hashCode, seralizedKey)
> >
> > // Await async result
> > val serializedResult: Array[Byte] = Await.result(
> > future, new FiniteDuration(
> > 10,
> > duration.SECONDS))
> >
> > // Deserialize response
> > val results = deserializeResponse(serializedResult)
> >
> > results
> > }
> >
> > private def deserializeResponse(serializedResult: Array[Byte]):
> > util.List[KeyedDataPoint[lang
> > .Integer]] = {
> > KvStateRequestSerializer.deserializeList(serializedResult,
> > getValueSerializer())
> > }
> >
> > As I was trying to debug the issue I see the first element in list gets
> > deserialized correctly, but it fails on the second one. It seems like the
> > serialized result is broken. Do you have any idea if I am doing sth
> wrong or
> > there is some bug?
> >
> >
> > The exception I get is:
> > java.io.EOFException: null
> > at
> > org.apache.flink.runtime.util.DataInputDeserializer.readFully(
> DataInputDeserializer.java:157)
> > at
> > org.apache.flink.runtime.util.DataInputDeserializer.readUTF(
> DataInputDeserializer.java:240)
> > at
> > org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(
> PojoSerializer.java:386)
> > at
> > org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer.
> deserializeList(KvStateRequestSerializer.java:487)
> > at
> > com.dataartisans.stateserver.queryclient.QueryClient.
> deserializeResponse(QueryClient.scala:44)
> >
> > You can browse the exact code at: https://github.com/dawidwys/
> flink-intro
> >
> > I would be grateful for any advice.
> >
> > Regards
> > Dawid Wysakowicz
>