Dear Flink Users, I’m getting started with Flink and I’ve bumped into a small problem. I have a keyed stream like this:
val stream = env.addSource(consumer) .flatMap(new ValidationMap()).name("ValidationMap") .keyBy(x => (x.getObj.foo(), x.getObj.bar(), x.getObj.baz())) .flatMap(new Calculator(this.config.size, this.config.queryableStateName)).name(jobname) Within my stream I have a ValueState that I use to maintain a list. I then use the QueryableStateClient to client.getKvState(flinkJobID, stateName, serializedKey.hashCode(), serializedKey); Where the “serializedKey” matches the .keyBy on the keyed stream. When I query the state things go wrong. I’ve determined that the JobManager appears to send my query to one of the three TaskManagers I have running, so about 1/3 of the time I get the proper result and the other 2/3 of the time I get org.apache.flink.runtime.query.netty.UnknownKeyOrNamespace: KvState does not hold any state for key/namespace. I feel like I must have somehow misconfigured my job, how can I instruct the job manager to properly query the TaskManager that has my data? Is there a specific setting or configuration I’m missing? Thank you for your time. -Phil