Hi I was experimenting with the Query State feature and I have some problems querying the state.
The code which I use to produce the queryable state is: env.addSource(kafkaConsumer).map( e => e match { case LoginClickEvent(_, t) => ("login", 1, t) case LogoutClickEvent(_, t) => ("logout", 1, t) case ButtonClickEvent(_, _, t) => ("button", 1, t) }).keyBy(0).timeWindow(Time.seconds(1)) .reduce((e1, e2) => (e1._1, e1._2 + e2._2, Math.max(e1._3, e2._3))) .map(e => new KeyedDataPoint[java.lang.Integer](e._1, e._3, e._2)) .keyBy("key") .asQueryableState( "type-time-series-count", new ListStateDescriptor[KeyedDataPoint[java.lang.Integer]]( "type-time-series-count", classOf[KeyedDataPoint[java.lang.Integer]])) As you see it is a rather simple job, in which I try to count events of different types in windows and then query by event type. In client code I do: // Query Flink state val future = client.getKvState(jobId, "type-time-series-count", key.hashCode, seralizedKey) // Await async result val serializedResult: Array[Byte] = Await.result( future, new FiniteDuration( 10, duration.SECONDS)) // Deserialize response val results = deserializeResponse(serializedResult) results } private def deserializeResponse(serializedResult: Array[Byte]): util.List[KeyedDataPoint[lang .Integer]] = { KvStateRequestSerializer.deserializeList(serializedResult, getValueSerializer()) } As I was trying to debug the issue I see the first element in list gets deserialized correctly, but it fails on the second one. It seems like the serialized result is broken. Do you have any idea if I am doing sth wrong or there is some bug? The exception I get is: java.io.EOFException: null at org.apache.flink.runtime.util.DataInputDeserializer.readFully(DataInputDeserializer.java:157) at org.apache.flink.runtime.util.DataInputDeserializer.readUTF(DataInputDeserializer.java:240) at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:386) at org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer.deserializeList(KvStateRequestSerializer.java:487) at com.dataartisans.stateserver.queryclient.QueryClient.deserializeResponse(QueryClient.scala:44) You can browse the exact code at: https://github.com/dawidwys/flink-intro I would be grateful for any advice. Regards Dawid Wysakowicz