Dear Flink Users,
I’m getting started with Flink and I’ve bumped into a small problem.  I have a 
keyed stream like this:

val stream = env.addSource(consumer)
  .flatMap(new ValidationMap()).name("ValidationMap")
  .keyBy(x => (x.getObj.foo(), x.getObj.bar(), x.getObj.baz()))
  .flatMap(new Calculator(this.config.size, 
this.config.queryableStateName)).name(jobname)


Within my stream I have a ValueState that I use to maintain a list.

I then use the QueryableStateClient to
client.getKvState(flinkJobID, stateName, serializedKey.hashCode(), 
serializedKey);

Where the “serializedKey” matches the .keyBy on the keyed stream.

When I query the state things go wrong.  I’ve determined that the JobManager 
appears to send my query to one of the three TaskManagers I have running, so 
about 1/3 of the time I get the proper result and the other 2/3 of the time I 
get

org.apache.flink.runtime.query.netty.UnknownKeyOrNamespace: KvState does not 
hold any state for key/namespace.

I feel like I must have somehow misconfigured my job, how can I instruct the 
job manager to properly query the TaskManager that has my data?  Is there a 
specific setting or configuration I’m missing?

Thank you for your time.

-Phil

Reply via email to