Log4J

2017-02-15 Thread Chet Masterson
Is there a way to reload a log4j.properties file without stopping and starting the job server?


Queryable State

2017-03-13 Thread Chet Masterson
Any guidance on troubleshooting error Error: No KvStateLocation found for KvState instance with name X when trying to make a queryable state call in Flink 1.2.0? I do know the server is receiving the call made from the remote client. (query.server.enable = true on the server in flink-conf.yaml, that is the only setting I changed there) I'm following the guidelines documented here: https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/stream/queryable_state.html under 'Managed Keyed State' I do understand this feature is 'highly experimental', and my code is in Scala, so I am also following the instructions under 'Note for Scala Users' in the documentation. 


Re: Queryable State

2017-04-26 Thread Chet Masterson
Ok...more information. 1. Built a fresh cluster from the ground up. Started testing queryable state at each step.2. When running under any configuration of task managers and job managers were parallelism = 1, the queries execute as expected.3. As soon as I cross over to parallelism = 3 with 3 task managers (1 job manager) feeding off a kafka topic partitioned three ways, queries will always fail, returning error (org.apache.flink.runtime.query.UnknownKvStateKeyGroupLocation) with an error message of null.4. I do know my state is as expected on the cluster. Liberal use of trace prints show my state managed on the jobs is as I expect. However, I cannot query them external.5. I am sending the query to jobmanager.rpc.port = 6123, which I confirmed is configured by using the job manager UI.6. My flink-conf.yaml: jobmanager.rpc.address: flink01jobmanager.rpc.port: 6123jobmanager.heap.mb: 256 taskmanager.heap.mb: 512taskmanager.data.port: 6121taskmanager.numberOfTaskSlots: 1taskmanager.memory.preallocate: false parallelism.default: 1blob.server.port: 6130 jobmanager.web.port: 8081query.server.enable: true 7. I do know my job is indeed running in parallel, from trace prints going to the task manager logs. Do I need a backend configured when running in parallel for the queryable state? Do I need a shared temp directory on the task managers? THANKS!  25.04.2017, 04:24, "Ufuk Celebi" <u...@apache.org>:It's strange that the rpc port is set to 3 when you use astandalone cluster and configure 6123 as the port. I'm pretty surethat the config has not been updated.But everything should work as you say when you point it to the correctjobmanager address and port. Could you please post the completestacktrace you get instead of the message you log?On Mon, Apr 24, 2017 at 5:31 PM, Chet Masterson<chet.master...@yandex.com> wrote:  More information: 0. I did remove the query.server.port and query.server.enabled from all flink-conf.yaml files, and restarted the cluster. 1. The Akka error doesn't seem to have anything to do with the problem. If I point my query client at an IP address with no Flink server running at all, I get that error. It seems to be a (side effect?) timeout for "no flink service is listening on the port you told me to check" 2. I did notice (using the Flink Web UI) even with the config file changes in 0, and no changes to the default flink-conf.yaml the jobmanager.rpc.port (6123), on my cluster, jobmanager.rpc.port is set to 3. 3. If I do send a query using the jobmanager.rpc.address and the jobmanager.rpc.port as displayed in the Flink Web UI, the connection to from the client to Flink will be initiated and completed. When I try to execute the query (code below), it will fail, and will get trapped. When I look at the error message returned (e.getMessage() below), it is simply 'null': try {   byte[] serializedResult = Await.result(future, new FiniteDuration(maxQueryTime, TimeUnit.SECONDS));   // de-serialize, commented out for testing   return null; } catch (Exception e) { logger.error("Queryable State Error: "+key+"-"+flinkJobID+"-"+stateName+" Error: "+e.getMessage()); return null; } Should I be sending the query to the job manager on the the job manager's rpc port when flink is clustered? ALSO - I do know the state name I am trying to query exists, is populated, and the job id exists. I also know the task managers are communicating with the job managers (task managers data port: 6121) and processed the data that resulted in the state variable I am trying to query being populated. All this was logged. 24.04.2017, 10:34, "Ufuk Celebi" <u...@apache.org>: Hey Chet! You can remove query.server.port: 6123 query.server.enable: true That shouldn't cause the Exception we see here though. I'm actually not sure what is causing the PduCodecException. Could this be related to different Akka versions being used in Flink and your client code? [1] Is it possible for you to check this? – Ufuk [1] https://groups.google.com/forum/#!topic/akka-user/vr1uXsf9gW0

Re: Queryable State

2017-04-26 Thread Chet Masterson
After setting the logging to DEBUG on the job manager, I learned four things: (On the message formatting below, I have the Flink logs formatted into JSON so I can import them into Kibana) 1. The appropriate key value state is registered in both parallelism = 1 and parallelism = 3 environments. In parallelism = 1, I saw one registration message in the log, in the parallelism = 3, I saw two registration messages: {"level":"DEBUG","time":"2017-04-26 15:54:55,254","class":"org.apache.flink.runtime.jobmanager.JobManager","ndc":"", "msg":"Key value state registered for job  under name "} 2. When I issued the query in both parallelism = 1 and parallelism = 3 environments, I saw "Lookup key-value state for job  with registration name ". In parallelism = 1, I saw 1 log message, in parallelism = 3, I saw two identical messages. 3. I saw no other messages in the job manager log that seemed relevant. 4. When issuing the query in parallelism = 3, I continued to get the error: org.apache.flink.runtime.query.UnknownKvStateKeyGroupLocation with a message of null. Thanks! 26.04.2017, 09:52, "Ufuk Celebi" <u...@apache.org>:Thanks! Your config looks good to me.Could you please set the log level org.apache.flink.runtime.jobmanager to DEBUG?log4j.logger.org.apache.flink.runtime.jobmanager=DEBUGThen we can check whether the JobManager logs the registration of thestate instance with the respective name in the case of parallelism >1?Expected output is something like this: "Key value state registeredfor job ${msg.getJobId} under name ${msg.getRegistrationName}."– UfukOn Wed, Apr 26, 2017 at 3:06 PM, Chet Masterson<chet.master...@yandex.com> wrote: Ok...more information. 1. Built a fresh cluster from the ground up. Started testing queryable state at each step. 2. When running under any configuration of task managers and job managers were parallelism = 1, the queries execute as expected. 3. As soon as I cross over to parallelism = 3 with 3 task managers (1 job manager) feeding off a kafka topic partitioned three ways, queries will always fail, returning error (org.apache.flink.runtime.query.UnknownKvStateKeyGroupLocation) with an error message of null. 4. I do know my state is as expected on the cluster. Liberal use of trace prints show my state managed on the jobs is as I expect. However, I cannot query them external. 5. I am sending the query to jobmanager.rpc.port = 6123, which I confirmed is configured by using the job manager UI. 6. My flink-conf.yaml: jobmanager.rpc.address: flink01 jobmanager.rpc.port: 6123 jobmanager.heap.mb: 256 taskmanager.heap.mb: 512 taskmanager.data.port: 6121 taskmanager.numberOfTaskSlots: 1 taskmanager.memory.preallocate: false parallelism.default: 1 blob.server.port: 6130 jobmanager.web.port: 8081 query.server.enable: true 7. I do know my job is indeed running in parallel, from trace prints going to the task manager logs. Do I need a backend configured when running in parallel for the queryable state? Do I need a shared temp directory on the task managers? THANKS! 25.04.2017, 04:24, "Ufuk Celebi" <u...@apache.org>: It's strange that the rpc port is set to 3 when you use a standalone cluster and configure 6123 as the port. I'm pretty sure that the config has not been updated. But everything should work as you say when you point it to the correct jobmanager address and port. Could you please post the complete stacktrace you get instead of the message you log? On Mon, Apr 24, 2017 at 5:31 PM, Chet Masterson <chet.master...@yandex.com> wrote:  More information:  0. I did remove the query.server.port and query.server.enabled from all  flink-conf.yaml files, and restarted the cluster.  1. The Akka error doesn't seem to have anything to do with the problem. If I  point my query client at an IP address with no Flink server running at all,  I get that error. It seems to be a (side effect?) timeout for "no flink  service is listening on the port you told me to check"  2. I did notice (using the Flink Web UI) even with the config file changes  in 0, and no changes to the default flink-conf.yaml the jobmanager.rpc.port  (6123), on my cluster, jobmanager.rpc.port is set to 3.  3. If I do send a query using the jobmanager.rpc.address and the  jobmanager.rpc.port as displayed in the Flink Web UI, the connection to from  the client to Flink will be initiated and completed. When I try to execute  the query (code below), it will fail, and will get trapped. When I look at  the error message returned (e.getMessage() below), it is simply 'null':  try {byte[] serializedResult = Await.result(future, new  FiniteDuration(maxQueryTime, TimeUnit.SECONDS));// de-serialize, commented out for testingreturn null;  }  catch (Exception e) {  logger.error("Queryable State Error:  "+

Queryable State

2017-04-24 Thread Chet Masterson
I moved up from running queryable state on a standalone Flink instance to a several node cluster. My queries don't seem to be responding when I execute them on the cluster. A few questions: 1. The error I am getting:WARN [ReliableDeliverySupervisor] Association with remote system [akka.tcp://flink@x.x.x.x:6123] has failed, address is now gated for [5000] ms. Reason: [Association failed with [akka.tcp://flink@x.x.x.x:6123]] Caused by: [Connection refused: /x.x.x.x:6123]2017/04/23 20:19:01.016 ERROR Actor not found for: ActorSelection[Anchor(akka.tcp://flink@x.x.x.x:6123/), Path(/user/jobmanager)] I assume this is because Flink is not servicing requests on port :6123. I am using the default RPC ports defined in flink-conf.yaml. I confirm nothing is listening on port 6123 by using netstat on the flink nodes. 2. I make the following changes on all nodes to flink-conf.yaml, then restart the cluster #jobmanager.rpc.port: 6123query.server.port: 6123query.server.enable: true 3. Now port 6123 is open, as viewed from netstat. My question: what is the proper configuration for servicing external queries when running in a cluster? Can I use jobmanager.rpc.port: 6123 which works standalone, do I have to add query.server.port and query.server.enable? Which port should I be using?  

Re: Queryable State

2017-04-28 Thread Chet Masterson
 Any insight here? I've got a situation where a key value state on a task manager is being registered with the job manager, but when I try to query it, the job manager responds it doesn't know the location of the key value state...  26.04.2017, 12:11, "Chet Masterson" <chet.master...@yandex.com>:After setting the logging to DEBUG on the job manager, I learned four things: (On the message formatting below, I have the Flink logs formatted into JSON so I can import them into Kibana) 1. The appropriate key value state is registered in both parallelism = 1 and parallelism = 3 environments. In parallelism = 1, I saw one registration message in the log, in the parallelism = 3, I saw two registration messages: {"level":"DEBUG","time":"2017-04-26 15:54:55,254","class":"org.apache.flink.runtime.jobmanager.JobManager","ndc":"", "msg":"Key value state registered for job  under name "} 2. When I issued the query in both parallelism = 1 and parallelism = 3 environments, I saw "Lookup key-value state for job  with registration name ". In parallelism = 1, I saw 1 log message, in parallelism = 3, I saw two identical messages. 3. I saw no other messages in the job manager log that seemed relevant. 4. When issuing the query in parallelism = 3, I continued to get the error: org.apache.flink.runtime.query.UnknownKvStateKeyGroupLocation with a message of null. Thanks! 26.04.2017, 09:52, "Ufuk Celebi" <u...@apache.org>:Thanks! Your config looks good to me.Could you please set the log level org.apache.flink.runtime.jobmanager to DEBUG?log4j.logger.org.apache.flink.runtime.jobmanager=DEBUGThen we can check whether the JobManager logs the registration of thestate instance with the respective name in the case of parallelism >1?Expected output is something like this: "Key value state registeredfor job ${msg.getJobId} under name ${msg.getRegistrationName}."– UfukOn Wed, Apr 26, 2017 at 3:06 PM, Chet Masterson<chet.master...@yandex.com> wrote: Ok...more information. 1. Built a fresh cluster from the ground up. Started testing queryable state at each step. 2. When running under any configuration of task managers and job managers were parallelism = 1, the queries execute as expected. 3. As soon as I cross over to parallelism = 3 with 3 task managers (1 job manager) feeding off a kafka topic partitioned three ways, queries will always fail, returning error (org.apache.flink.runtime.query.UnknownKvStateKeyGroupLocation) with an error message of null. 4. I do know my state is as expected on the cluster. Liberal use of trace prints show my state managed on the jobs is as I expect. However, I cannot query them external. 5. I am sending the query to jobmanager.rpc.port = 6123, which I confirmed is configured by using the job manager UI. 6. My flink-conf.yaml: jobmanager.rpc.address: flink01 jobmanager.rpc.port: 6123 jobmanager.heap.mb: 256 taskmanager.heap.mb: 512 taskmanager.data.port: 6121 taskmanager.numberOfTaskSlots: 1 taskmanager.memory.preallocate: false parallelism.default: 1 blob.server.port: 6130 jobmanager.web.port: 8081 query.server.enable: true 7. I do know my job is indeed running in parallel, from trace prints going to the task manager logs. Do I need a backend configured when running in parallel for the queryable state? Do I need a shared temp directory on the task managers? THANKS! 25.04.2017, 04:24, "Ufuk Celebi" <u...@apache.org>: It's strange that the rpc port is set to 3 when you use a standalone cluster and configure 6123 as the port. I'm pretty sure that the config has not been updated. But everything should work as you say when you point it to the correct jobmanager address and port. Could you please post the complete stacktrace you get instead of the message you log? On Mon, Apr 24, 2017 at 5:31 PM, Chet Masterson <chet.master...@yandex.com> wrote:  More information:  0. I did remove the query.server.port and query.server.enabled from all  flink-conf.yaml files, and restarted the cluster.  1. The Akka error doesn't seem to have anything to do with the problem. If I  point my query client at an IP address with no Flink server running at all,  I get that error. It seems to be a (side effect?) timeout for "no flink  service is listening on the port you told me to check"  2. I did notice (using the Flink Web UI) even with the config file changes  in 0, and no changes to the default flink-conf.yaml the jobmanager.rpc.port  (6123), on my cluster, jobmanager.rpc.port is set to 3.  3. If I do send a query using the jobmanager.rpc.address and the  jobmanager.rpc.port as displayed in the Flink Web UI, the connection to from  the client to Flink will be initiated and completed. When I try to execute  the query (code below), it will fail, and will get trapped. When I look at  the error message returned (e.getMessage() 

Re: Queryable State

2017-05-04 Thread Chet Masterson
I found the issue. When parallelism = 3, my test data set was skewed such that data was only going to two of the three task managers (kafka partition = 3, number of flink nodes = 3, parallelism = 3). As soon as I created a test data set with enough keys that spread across all three task managers, queryable state started working as expected. That is why only two KVStates were registered with the job manager, instead of three. my FINAL :-) questionshould I be getting org.apache.flink.runtime.query.UnknownKvStateKeyGroupLocation in the event only N-1 task managers have data in a parallelism of N situation? Thanks for all the help!  04.05.2017, 11:24, "Ufuk Celebi" <u...@apache.org>:Could you try KvStateRegistry#registerKvState please?In the JM logs you should see something about the number of connectedtask managers and in the task manager logs that each one connects to aJM.– UfukOn Tue, May 2, 2017 at 2:53 PM, Chet Masterson<chet.master...@yandex.com> wrote: Can do. Any advice on where the trace prints should go in the task manager source code? BTW - How do I know I have a correctly configured cluster? Is there a set of messages in the job / task manager logs that indicate all required connectivity is present? I know I use the UI to make sure all the task managers are present, and that the job is running on all of them, but is there some verbiage in the logs that indicates the job manager can talk to all the task managers, and vice versa? Thanks! 02.05.2017, 06:03, "Ufuk Celebi" <u...@apache.org>: Hey Chet! I'm wondering why you are only seeing 2 registration messages for 3 task managers. Unfortunately, there is no log message at the task managers when they send out the notification. Is it possible for you to run a remote debugger with the task managers or build a custom Flink version with the appropriate log messages on the task manager side? – Ufuk On Fri, Apr 28, 2017 at 2:20 PM, Chet Masterson <chet.master...@yandex.com> wrote:  Any insight here? I've got a situation where a key value state on a task  manager is being registered with the job manager, but when I try to query  it, the job manager responds it doesn't know the location of the key value  state...  26.04.2017, 12:11, "Chet Masterson" <chet.master...@yandex.com>:  After setting the logging to DEBUG on the job manager, I learned four  things:  (On the message formatting below, I have the Flink logs formatted into JSON  so I can import them into Kibana)  1. The appropriate key value state is registered in both parallelism = 1 and  parallelism = 3 environments. In parallelism = 1, I saw one registration  message in the log, in the parallelism = 3, I saw two registration messages:  {"level":"DEBUG","time":"2017-04-26 15:54:55,254","class":"org.apache.flink.runtime.jobmanager.JobManager","ndc":"",  "msg":"Key value state registered for job  under name "}  2. When I issued the query in both parallelism = 1 and parallelism = 3  environments, I saw "Lookup key-value state for job  with  registration name ". In parallelism = 1, I saw 1 log message, in  parallelism = 3, I saw two identical messages.  3. I saw no other messages in the job manager log that seemed relevant.  4. When issuing the query in parallelism = 3, I continued to get the error:  org.apache.flink.runtime.query.UnknownKvStateKeyGroupLocation with a message  of null.  Thanks!  26.04.2017, 09:52, "Ufuk Celebi" <u...@apache.org>:  Thanks! Your config looks good to me.  Could you please set the log level org.apache.flink.runtime.jobmanager to  DEBUG?  log4j.logger.org.apache.flink.runtime.jobmanager=DEBUG  Then we can check whether the JobManager logs the registration of the  state instance with the respective name in the case of parallelism >  1?  Expected output is something like this: "Key value state registered  for job ${msg.getJobId} under name ${msg.getRegistrationName}."  – Ufuk  On Wed, Apr 26, 2017 at 3:06 PM, Chet Masterson  <chet.master...@yandex.com> wrote:   Ok...more information.   1. Built a fresh cluster from the ground up. Started testing queryable  state   at each step.   2. When running under any configuration of task managers and job managers   were parallelism = 1, the queries execute as expected.   3. As soon as I cross over to parallelism = 3 with 3 task managers (1 job   manager) feeding off a kafka topic partitioned three ways, queries will   always fail, returning error   (org.apache.flink.runtime.query.UnknownKvStateKeyGroupLocation) with an   error message of null.   4. I do know my state is as expected on the cluster. Liberal use of trace   prints show my state managed on the jobs is as I expect. However, I cannot   query them external.   5. I am sending the query to jobmanager.rpc.port = 6123, which I confirmed   is configured by using the job manager UI.

Re: Queryable State

2017-05-02 Thread Chet Masterson
Can do. Any advice on where the trace prints should go in the task manager source code? BTW - How do I know I have a correctly configured cluster? Is there a set of messages in the job / task manager logs that indicate all required connectivity is present? I know I use the UI to make sure all the task managers are present, and that the job is running on all of them, but is there some verbiage in the logs that indicates the job manager can talk to all the task managers, and vice versa? Thanks!  02.05.2017, 06:03, "Ufuk Celebi" <u...@apache.org>:Hey Chet! I'm wondering why you are only seeing 2 registrationmessages for 3 task managers. Unfortunately, there is no log messageat the task managers when they send out the notification. Is itpossible for you to run a remote debugger with the task managers orbuild a custom Flink version with the appropriate log messages on thetask manager side?– UfukOn Fri, Apr 28, 2017 at 2:20 PM, Chet Masterson<chet.master...@yandex.com> wrote:  Any insight here? I've got a situation where a key value state on a task manager is being registered with the job manager, but when I try to query it, the job manager responds it doesn't know the location of the key value state... 26.04.2017, 12:11, "Chet Masterson" <chet.master...@yandex.com>: After setting the logging to DEBUG on the job manager, I learned four things: (On the message formatting below, I have the Flink logs formatted into JSON so I can import them into Kibana) 1. The appropriate key value state is registered in both parallelism = 1 and parallelism = 3 environments. In parallelism = 1, I saw one registration message in the log, in the parallelism = 3, I saw two registration messages: {"level":"DEBUG","time":"2017-04-26 15:54:55,254","class":"org.apache.flink.runtime.jobmanager.JobManager","ndc":"", "msg":"Key value state registered for job  under name "} 2. When I issued the query in both parallelism = 1 and parallelism = 3 environments, I saw "Lookup key-value state for job  with registration name ". In parallelism = 1, I saw 1 log message, in parallelism = 3, I saw two identical messages. 3. I saw no other messages in the job manager log that seemed relevant. 4. When issuing the query in parallelism = 3, I continued to get the error: org.apache.flink.runtime.query.UnknownKvStateKeyGroupLocation with a message of null. Thanks! 26.04.2017, 09:52, "Ufuk Celebi" <u...@apache.org>: Thanks! Your config looks good to me. Could you please set the log level org.apache.flink.runtime.jobmanager to DEBUG? log4j.logger.org.apache.flink.runtime.jobmanager=DEBUG Then we can check whether the JobManager logs the registration of the state instance with the respective name in the case of parallelism > 1? Expected output is something like this: "Key value state registered for job ${msg.getJobId} under name ${msg.getRegistrationName}." – Ufuk On Wed, Apr 26, 2017 at 3:06 PM, Chet Masterson <chet.master...@yandex.com> wrote:  Ok...more information.  1. Built a fresh cluster from the ground up. Started testing queryable state  at each step.  2. When running under any configuration of task managers and job managers  were parallelism = 1, the queries execute as expected.  3. As soon as I cross over to parallelism = 3 with 3 task managers (1 job  manager) feeding off a kafka topic partitioned three ways, queries will  always fail, returning error  (org.apache.flink.runtime.query.UnknownKvStateKeyGroupLocation) with an  error message of null.  4. I do know my state is as expected on the cluster. Liberal use of trace  prints show my state managed on the jobs is as I expect. However, I cannot  query them external.  5. I am sending the query to jobmanager.rpc.port = 6123, which I confirmed  is configured by using the job manager UI.  6. My flink-conf.yaml:  jobmanager.rpc.address: flink01  jobmanager.rpc.port: 6123  jobmanager.heap.mb: 256  taskmanager.heap.mb: 512  taskmanager.data.port: 6121  taskmanager.numberOfTaskSlots: 1  taskmanager.memory.preallocate: false  parallelism.default: 1  blob.server.port: 6130  jobmanager.web.port: 8081  query.server.enable: true  7. I do know my job is indeed running in parallel, from trace prints going  to the task manager logs.  Do I need a backend configured when running in parallel for the queryable  state? Do I need a shared temp directory on the task managers?  THANKS!  25.04.2017, 04:24, "Ufuk Celebi" <u...@apache.org>:  It's strange that the rpc port is set to 3 when you use a  standalone cluster and configure 6123 as the port. I'm pretty sure  that the config has not been updated.  But everything should work as you say when you point it to the correct  jobmanager address and port. Could you please post the complete  stacktrace you get instead of the message you log?  On Mon, Apr 24, 2017 at 5:31 PM, C