Ufuk - thank you for your help. My flink-conf.yaml is now configured cluster-wide (with restart) as:
# my flink-conf.yaml, on all flink nodes:
jobmanager.rpc.address: x.x.x.x
jobmanager.rpc.port: 6123
query.server.port: 6123
query.server.enable: true
When I try to issue my query now with the above settings (this query worked on a single flink node running by itself):
2017/04/24 07:08:26.940 ERROR [OneForOneStrategy] Error while decoding incoming Akka PDU of length: 3623
akka.remote.transport.AkkaProtocolException: Error while decoding incoming Akka PDU of length: 3623
Caused by: akka.remote.transport.PduCodecException: Decoding PDU failed.
at akka.remote.transport.AkkaPduProtobufCodec$.decodePdu(AkkaPduCodec.scala:167)
at akka.remote.transport.ProtocolStateActor.akka$remote$transport$ProtocolStateActor$$decodePdu(AkkaProtocolTransport.scala:621)
at akka.remote.transport.ProtocolStateActor$$anonfun$3.applyOrElse(AkkaProtocolTransport.scala:372)
at akka.remote.transport.ProtocolStateActor$$anonfun$3.applyOrElse(AkkaProtocolTransport.scala:367)
My client code. I verified the job id exists, and is running, the stateName exists, and is populated. Again, this code runs on a single standalone flink node
// server = job manager ip, which I can route to, and responds on port 6123
// port = 6123
private static QueryableStateClient newQueryableStateClient(String server, int port) {
Configuration configFlink = new Configuration();
configFlink.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, server);
configFlink.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, port);
try {
client = new QueryableStateClient(configFlink);
return client;
}
catch (Exception e) {
logger.error("Error configuring QueryableStateGateway: "+e);
return null;
}
}
public HashMap<Long, java.util.ArrayList<String>> executeQuery(Tuple2<String, String> key, String flinkJobID, String stateName) {
JobID jobId = JobID.fromHexString(flinkJobID);
byte[] serializedKey = getSeralizedKey(key);
Future<byte[]> future = client.getKvState(jobId, stateName, key.hashCode(), serializedKey);
try {
byte[] serializedResult = Await.result(future, new FiniteDuration(maxQueryTime, TimeUnit.SECONDS));
HashMap<Long, java.util.ArrayList<String>> results = deserializeResponseGlobalCoverage(serializedResult);
return results;
}
catch (Exception e) {
logger.error("Queryable State Error: "+key+"-"+flinkJobID+"-"+stateName+" Error: "+e.getMessage());
return null;
}
}
Thank you for the help!
24.04.2017, 07:55, "Ufuk Celebi" <u...@apache.org>:
You should be able to use queryable state w/o any changes to the
default config. The `query.server.port` option defines the port of the
queryable state server that serves the state on the task managers and
it is enabled by default.
The important thing is to configure the client to discover the
JobManager and everything else should work out of the box. Can you
please
1) Use the default config and verify in the JobManager logs that the
JobManager listens on port 6123 (the default JM port) and that all
expected TaskManagers connect to it?
2) Share the code for how you configure the QueryableStateClient?
– Ufuk