Hi I was experimenting with the Query State feature and I have some
problems querying the state.

The code which I use to produce the queryable state is:

    env.addSource(kafkaConsumer).map(
      e => e match {
        case LoginClickEvent(_, t) => ("login", 1, t)
        case LogoutClickEvent(_, t) => ("logout", 1, t)
        case ButtonClickEvent(_, _, t) => ("button", 1, t)
      }).keyBy(0).timeWindow(Time.seconds(1))
      .reduce((e1, e2) => (e1._1, e1._2 + e2._2, Math.max(e1._3, e2._3)))
      .map(e => new KeyedDataPoint[java.lang.Integer](e._1, e._3, e._2))
      .keyBy("key")
      .asQueryableState(
        "type-time-series-count",
        new ListStateDescriptor[KeyedDataPoint[java.lang.Integer]](
          "type-time-series-count",
          classOf[KeyedDataPoint[java.lang.Integer]]))

As you see it is a rather simple job, in which I try to count events of
different types in windows and then query by event type.

In client code I do:
    // Query Flink state
    val future = client.getKvState(jobId, "type-time-series-count",
key.hashCode, seralizedKey)

    // Await async result
    val serializedResult: Array[Byte] = Await.result(
      future, new FiniteDuration(
        10,
        duration.SECONDS))

    // Deserialize response
    val results = deserializeResponse(serializedResult)

    results
  }

  private def deserializeResponse(serializedResult: Array[Byte]):
util.List[KeyedDataPoint[lang
  .Integer]] = {
    KvStateRequestSerializer.deserializeList(serializedResult,
getValueSerializer())
  }

As I was trying to debug the issue I see the first element in list gets
deserialized correctly, but it fails on the second one. It seems like the
serialized result is broken. Do you have any idea if I am doing sth wrong
or there is some bug?


The exception I get is:
java.io.EOFException: null
at
org.apache.flink.runtime.util.DataInputDeserializer.readFully(DataInputDeserializer.java:157)
at
org.apache.flink.runtime.util.DataInputDeserializer.readUTF(DataInputDeserializer.java:240)
at
org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:386)
at
org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer.deserializeList(KvStateRequestSerializer.java:487)
at
com.dataartisans.stateserver.queryclient.QueryClient.deserializeResponse(QueryClient.scala:44)

You can browse the exact code at: https://github.com/dawidwys/flink-intro

I would be grateful for any advice.

Regards
Dawid Wysakowicz

Reply via email to