Any insight here? I've got a situation where a key value state on a task manager is being registered with the job manager, but when I try to query it, the job manager responds it doesn't know the location of the key value state...
 
 
26.04.2017, 12:11, "Chet Masterson" <chet.master...@yandex.com>:
After setting the logging to DEBUG on the job manager, I learned four things:
 
(On the message formatting below, I have the Flink logs formatted into JSON so I can import them into Kibana)
 
1. The appropriate key value state is registered in both parallelism = 1 and parallelism = 3 environments. In parallelism = 1, I saw one registration message in the log, in the parallelism = 3, I saw two registration messages: {"level":"DEBUG","time":"2017-04-26 15:54:55,254","class":"org.apache.flink.runtime.jobmanager.JobManager","ndc":"", "msg":"Key value state registered for job <job id> under name <statename>"}
 
2. When I issued the query in both parallelism = 1 and parallelism = 3 environments, I saw "Lookup key-value state for job <job id> with registration name <statename>". In parallelism = 1, I saw 1 log message, in parallelism = 3, I saw two identical messages.
 
3. I saw no other messages in the job manager log that seemed relevant.
 
4. When issuing the query in parallelism = 3, I continued to get the error: org.apache.flink.runtime.query.UnknownKvStateKeyGroupLocation with a message of null.
 
Thanks!
 
 
 
 
 
26.04.2017, 09:52, "Ufuk Celebi" <u...@apache.org>:

Thanks! Your config looks good to me.

Could you please set the log level org.apache.flink.runtime.jobmanager to DEBUG?

log4j.logger.org.apache.flink.runtime.jobmanager=DEBUG

Then we can check whether the JobManager logs the registration of the
state instance with the respective name in the case of parallelism >
1?

Expected output is something like this: "Key value state registered
for job ${msg.getJobId} under name ${msg.getRegistrationName}."

– Ufuk

On Wed, Apr 26, 2017 at 3:06 PM, Chet Masterson
<chet.master...@yandex.com> wrote:

 Ok...more information.

 1. Built a fresh cluster from the ground up. Started testing queryable state
 at each step.
 2. When running under any configuration of task managers and job managers
 were parallelism = 1, the queries execute as expected.
 3. As soon as I cross over to parallelism = 3 with 3 task managers (1 job
 manager) feeding off a kafka topic partitioned three ways, queries will
 always fail, returning error
 (org.apache.flink.runtime.query.UnknownKvStateKeyGroupLocation) with an
 error message of null.
 4. I do know my state is as expected on the cluster. Liberal use of trace
 prints show my state managed on the jobs is as I expect. However, I cannot
 query them external.
 5. I am sending the query to jobmanager.rpc.port = 6123, which I confirmed
 is configured by using the job manager UI.
 6. My flink-conf.yaml:

 jobmanager.rpc.address: flink01
 jobmanager.rpc.port: 6123
 jobmanager.heap.mb: 256

 taskmanager.heap.mb: 512
 taskmanager.data.port: 6121
 taskmanager.numberOfTaskSlots: 1
 taskmanager.memory.preallocate: false

 parallelism.default: 1
 blob.server.port: 6130

 jobmanager.web.port: 8081
 query.server.enable: true

 7. I do know my job is indeed running in parallel, from trace prints going
 to the task manager logs.

 Do I need a backend configured when running in parallel for the queryable
 state? Do I need a shared temp directory on the task managers?

 THANKS!


 25.04.2017, 04:24, "Ufuk Celebi" <u...@apache.org>:

 It's strange that the rpc port is set to 30000 when you use a
 standalone cluster and configure 6123 as the port. I'm pretty sure
 that the config has not been updated.

 But everything should work as you say when you point it to the correct
 jobmanager address and port. Could you please post the complete
 stacktrace you get instead of the message you log?


 On Mon, Apr 24, 2017 at 5:31 PM, Chet Masterson
 <chet.master...@yandex.com> wrote:



  More information:

  0. I did remove the query.server.port and query.server.enabled from all
  flink-conf.yaml files, and restarted the cluster.

  1. The Akka error doesn't seem to have anything to do with the problem. If
 I
  point my query client at an IP address with no Flink server running at all,
  I get that error. It seems to be a (side effect?) timeout for "no flink
  service is listening on the port you told me to check"

  2. I did notice (using the Flink Web UI) even with the config file changes
  in 0, and no changes to the default flink-conf.yaml the jobmanager.rpc.port
  (6123), on my cluster, jobmanager.rpc.port is set to 30000.

  3. If I do send a query using the jobmanager.rpc.address and the
  jobmanager.rpc.port as displayed in the Flink Web UI, the connection to
 from
  the client to Flink will be initiated and completed. When I try to execute
  the query (code below), it will fail, and will get trapped. When I look at
  the error message returned (e.getMessage() below), it is simply 'null':

  try {
        byte[] serializedResult = Await.result(future, new
  FiniteDuration(maxQueryTime, TimeUnit.SECONDS));
        // de-serialize, commented out for testing
        return null;
          }
          catch (Exception e) {
              logger.error("Queryable State Error:
  "+key+"-"+flinkJobID+"-"+stateName+" Error: "+e.getMessage());
              return null;
          }

  Should I be sending the query to the job manager on the the job manager's
  rpc port when flink is clustered?

  ALSO - I do know the state name I am trying to query exists, is populated,
  and the job id exists. I also know the task managers are communicating with
  the job managers (task managers data port: 6121) and processed the data
 that
  resulted in the state variable I am trying to query being populated. All
  this was logged.


  24.04.2017, 10:34, "Ufuk Celebi" <u...@apache.org>:

  Hey Chet! You can remove

  query.server.port: 6123
  query.server.enable: true

  That shouldn't cause the Exception we see here though. I'm actually
  not sure what is causing the PduCodecException. Could this be related
  to different Akka versions being used in Flink and your client code?
  [1] Is it possible for you to check this?

  – Ufuk

  [1] https://groups.google.com/forum/#!topic/akka-user/vr1uXsf9gW0

Reply via email to