[jira] [Commented] (FLINK-5464) MetricQueryService throws NullPointerException on JobManager
[ https://issues.apache.org/jira/browse/FLINK-5464?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15834770#comment-15834770 ] ASF GitHub Bot commented on FLINK-5464: --- Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/3128#discussion_r97343787 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerialization.java --- @@ -70,16 +93,26 @@ public MetricSerializationResult(byte[] data, int numCounters, int numGauges, in //- // Serialization //- + public static class MetricDumpSerializer { + private DataOutputSerializer buffer = new DataOutputSerializer(1024 * 32); /** * Serializes the given metrics and returns the resulting byte array. +* +* Should a {@link Metric} accessed in this method throw an exception it will be omitted from the returned --- End diff -- Very nice > MetricQueryService throws NullPointerException on JobManager > > > Key: FLINK-5464 > URL: https://issues.apache.org/jira/browse/FLINK-5464 > Project: Flink > Issue Type: Bug > Components: Webfrontend >Affects Versions: 1.2.0 >Reporter: Robert Metzger >Assignee: Chesnay Schepler > > I'm using Flink 699f4b0. > My JobManager log contains many of these log entries: > {code} > 2017-01-11 19:42:05,778 WARN > org.apache.flink.runtime.webmonitor.metrics.MetricFetcher - Fetching > metrics failed. > akka.pattern.AskTimeoutException: Ask timed out on > [Actor[akka://flink/user/MetricQueryService#-970662317]] after [1 ms] > at > akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:334) > at akka.actor.Scheduler$$anon$7.run(Scheduler.scala:117) > at > scala.concurrent.Future$InternalCallbackExecutor$.scala$concurrent$Future$InternalCallbackExecutor$$unbatchedExecute(Future.scala:694) > at > scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:691) > at > akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(Scheduler.scala:474) > at > akka.actor.LightArrayRevolverScheduler$$anon$8.executeBucket$1(Scheduler.scala:425) > at > akka.actor.LightArrayRevolverScheduler$$anon$8.nextTick(Scheduler.scala:429) > at > akka.actor.LightArrayRevolverScheduler$$anon$8.run(Scheduler.scala:381) > at java.lang.Thread.run(Thread.java:745) > 2017-01-11 19:42:07,765 WARN > org.apache.flink.runtime.metrics.dump.MetricQueryService - An exception > occurred while processing a message. > java.lang.NullPointerException > at > org.apache.flink.runtime.metrics.dump.MetricDumpSerialization.serializeGauge(MetricDumpSerialization.java:162) > at > org.apache.flink.runtime.metrics.dump.MetricDumpSerialization.access$300(MetricDumpSerialization.java:47) > at > org.apache.flink.runtime.metrics.dump.MetricDumpSerialization$MetricDumpSerializer.serialize(MetricDumpSerialization.java:90) > at > org.apache.flink.runtime.metrics.dump.MetricQueryService.onReceive(MetricQueryService.java:109) > at > akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:167) > at akka.actor.Actor$class.aroundReceive(Actor.scala:467) > at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:97) > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) > at akka.actor.ActorCell.invoke(ActorCell.scala:487) > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238) > at akka.dispatch.Mailbox.run(Mailbox.scala:220) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5464) MetricQueryService throws NullPointerException on JobManager
[ https://issues.apache.org/jira/browse/FLINK-5464?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15834724#comment-15834724 ] ASF GitHub Bot commented on FLINK-5464: --- Github user zentol commented on the issue: https://github.com/apache/flink/pull/3128 @uce I've addressed your comments. > MetricQueryService throws NullPointerException on JobManager > > > Key: FLINK-5464 > URL: https://issues.apache.org/jira/browse/FLINK-5464 > Project: Flink > Issue Type: Bug > Components: Webfrontend >Affects Versions: 1.2.0 >Reporter: Robert Metzger >Assignee: Chesnay Schepler > > I'm using Flink 699f4b0. > My JobManager log contains many of these log entries: > {code} > 2017-01-11 19:42:05,778 WARN > org.apache.flink.runtime.webmonitor.metrics.MetricFetcher - Fetching > metrics failed. > akka.pattern.AskTimeoutException: Ask timed out on > [Actor[akka://flink/user/MetricQueryService#-970662317]] after [1 ms] > at > akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:334) > at akka.actor.Scheduler$$anon$7.run(Scheduler.scala:117) > at > scala.concurrent.Future$InternalCallbackExecutor$.scala$concurrent$Future$InternalCallbackExecutor$$unbatchedExecute(Future.scala:694) > at > scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:691) > at > akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(Scheduler.scala:474) > at > akka.actor.LightArrayRevolverScheduler$$anon$8.executeBucket$1(Scheduler.scala:425) > at > akka.actor.LightArrayRevolverScheduler$$anon$8.nextTick(Scheduler.scala:429) > at > akka.actor.LightArrayRevolverScheduler$$anon$8.run(Scheduler.scala:381) > at java.lang.Thread.run(Thread.java:745) > 2017-01-11 19:42:07,765 WARN > org.apache.flink.runtime.metrics.dump.MetricQueryService - An exception > occurred while processing a message. > java.lang.NullPointerException > at > org.apache.flink.runtime.metrics.dump.MetricDumpSerialization.serializeGauge(MetricDumpSerialization.java:162) > at > org.apache.flink.runtime.metrics.dump.MetricDumpSerialization.access$300(MetricDumpSerialization.java:47) > at > org.apache.flink.runtime.metrics.dump.MetricDumpSerialization$MetricDumpSerializer.serialize(MetricDumpSerialization.java:90) > at > org.apache.flink.runtime.metrics.dump.MetricQueryService.onReceive(MetricQueryService.java:109) > at > akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:167) > at akka.actor.Actor$class.aroundReceive(Actor.scala:467) > at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:97) > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) > at akka.actor.ActorCell.invoke(ActorCell.scala:487) > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238) > at akka.dispatch.Mailbox.run(Mailbox.scala:220) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5464) MetricQueryService throws NullPointerException on JobManager
[ https://issues.apache.org/jira/browse/FLINK-5464?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15834655#comment-15834655 ] ASF GitHub Bot commented on FLINK-5464: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3128#discussion_r97327675 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerialization.java --- @@ -191,62 +233,65 @@ private static void serializeMeter(DataOutputStream dos, Meter meter) throws IOE * * @param data serialized metrics * @return A list containing the deserialized metrics. -* @throws IOException */ - public List deserialize(byte[] data) throws IOException { - ByteArrayInputStream bais = new ByteArrayInputStream(data); - DataInputStream dis = new DataInputStream(bais); - int numCounters = dis.readInt(); - int numGauges = dis.readInt(); - int numHistograms = dis.readInt(); - int numMeters = dis.readInt(); + public List deserialize(MetricDumpSerialization.MetricSerializationResult data) { + DataInputView in = new DataInputDeserializer(data.data, 0, data.data.length); - List metrics = new ArrayList<>(numCounters + numGauges + numHistograms); + List metrics = new ArrayList<>(data.numCounters + data.numGauges + data.numHistograms + data.numMeters); - for (int x = 0; x < numCounters; x++) { - metrics.add(deserializeCounter(dis)); + for (int x = 0; x < data.numCounters; x++) { + try { + metrics.add(deserializeCounter(in)); + } catch (Exception e) { + LOG.warn("Failed to deserialize counter.", e); + } } - for (int x = 0; x < numGauges; x++) { - metrics.add(deserializeGauge(dis)); + for (int x = 0; x < data.numGauges; x++) { + try { + metrics.add(deserializeGauge(in)); + } catch (Exception e) { + LOG.warn("Failed to deserialize counter.", e); + } } - for (int x = 0; x < numHistograms; x++) { - metrics.add(deserializeHistogram(dis)); + for (int x = 0; x < data.numHistograms; x++) { + try { + metrics.add(deserializeHistogram(in)); + } catch (Exception e) { + LOG.warn("Failed to deserialize counter.", e); + } } - for (int x = 0; x < numMeters; x++) { - metrics.add(deserializeMeter(dis)); + for (int x = 0; x < data.numMeters; x++) { + try { + metrics.add(deserializeMeter(in)); + } catch (Exception e) { + LOG.warn("Failed to deserialize counter.", e); + } } - - return metrics; } } - private static String deserializeString(DataInputStream dis) throws IOException { - int stringLength = dis.readInt(); - byte[] bytes = new byte[stringLength]; - dis.readFully(bytes); - return new String(bytes); - } - private static MetricDump.CounterDump deserializeCounter(DataInputStream dis) throws IOException { + private static MetricDump.CounterDump deserializeCounter(DataInputView dis) throws IOException { QueryScopeInfo scope = deserializeMetricInfo(dis); - String name = deserializeString(dis); - return new MetricDump.CounterDump(scope, name, dis.readLong()); + String name = dis.readUTF(); + long count = dis.readLong(); + return new MetricDump.CounterDump(scope, name, count); --- End diff -- Personally for short methods i think it's overkill. I would do it for methods like `deserializaHistogram` though. > MetricQueryService throws NullPointerException on JobManager >
[jira] [Commented] (FLINK-5464) MetricQueryService throws NullPointerException on JobManager
[ https://issues.apache.org/jira/browse/FLINK-5464?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15834654#comment-15834654 ] ASF GitHub Bot commented on FLINK-5464: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3128#discussion_r97327512 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerialization.java --- @@ -191,62 +233,65 @@ private static void serializeMeter(DataOutputStream dos, Meter meter) throws IOE * * @param data serialized metrics * @return A list containing the deserialized metrics. -* @throws IOException */ - public List deserialize(byte[] data) throws IOException { - ByteArrayInputStream bais = new ByteArrayInputStream(data); - DataInputStream dis = new DataInputStream(bais); - int numCounters = dis.readInt(); - int numGauges = dis.readInt(); - int numHistograms = dis.readInt(); - int numMeters = dis.readInt(); + public List deserialize(MetricDumpSerialization.MetricSerializationResult data) { + DataInputView in = new DataInputDeserializer(data.data, 0, data.data.length); - List metrics = new ArrayList<>(numCounters + numGauges + numHistograms); + List metrics = new ArrayList<>(data.numCounters + data.numGauges + data.numHistograms + data.numMeters); - for (int x = 0; x < numCounters; x++) { - metrics.add(deserializeCounter(dis)); + for (int x = 0; x < data.numCounters; x++) { + try { + metrics.add(deserializeCounter(in)); + } catch (Exception e) { + LOG.warn("Failed to deserialize counter.", e); + } } - for (int x = 0; x < numGauges; x++) { - metrics.add(deserializeGauge(dis)); + for (int x = 0; x < data.numGauges; x++) { + try { + metrics.add(deserializeGauge(in)); + } catch (Exception e) { + LOG.warn("Failed to deserialize counter.", e); + } } - for (int x = 0; x < numHistograms; x++) { - metrics.add(deserializeHistogram(dis)); + for (int x = 0; x < data.numHistograms; x++) { + try { + metrics.add(deserializeHistogram(in)); + } catch (Exception e) { + LOG.warn("Failed to deserialize counter.", e); + } } - for (int x = 0; x < numMeters; x++) { - metrics.add(deserializeMeter(dis)); + for (int x = 0; x < data.numMeters; x++) { + try { + metrics.add(deserializeMeter(in)); + } catch (Exception e) { + LOG.warn("Failed to deserialize counter.", e); + } } - - return metrics; --- End diff -- It appears i forgot to push some of the last-minute fixes :/ > MetricQueryService throws NullPointerException on JobManager > > > Key: FLINK-5464 > URL: https://issues.apache.org/jira/browse/FLINK-5464 > Project: Flink > Issue Type: Bug > Components: Webfrontend >Affects Versions: 1.2.0 >Reporter: Robert Metzger >Assignee: Chesnay Schepler > > I'm using Flink 699f4b0. > My JobManager log contains many of these log entries: > {code} > 2017-01-11 19:42:05,778 WARN > org.apache.flink.runtime.webmonitor.metrics.MetricFetcher - Fetching > metrics failed. > akka.pattern.AskTimeoutException: Ask timed out on > [Actor[akka://flink/user/MetricQueryService#-970662317]] after [1 ms] > at > akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:334) > at akka.actor.Scheduler$$anon$7.run(Scheduler.scala:117) > at >
[jira] [Commented] (FLINK-5464) MetricQueryService throws NullPointerException on JobManager
[ https://issues.apache.org/jira/browse/FLINK-5464?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15834510#comment-15834510 ] ASF GitHub Bot commented on FLINK-5464: --- Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/3128#discussion_r97315164 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerialization.java --- @@ -50,12 +51,27 @@ private MetricDumpSerialization() { } + public static class MetricSerializationResult { + public final byte[] data; + public final int numCounters; + public final int numGauges; + public final int numMeters; + public final int numHistograms; + + public MetricSerializationResult(byte[] data, int numCounters, int numGauges, int numMeters, int numHistograms) { + this.data = data; --- End diff -- Let's add simple sanity checks `checkNotNull(data)` and `checkArgument(num* >= 0)` for the other fields. > MetricQueryService throws NullPointerException on JobManager > > > Key: FLINK-5464 > URL: https://issues.apache.org/jira/browse/FLINK-5464 > Project: Flink > Issue Type: Bug > Components: Webfrontend >Affects Versions: 1.2.0 >Reporter: Robert Metzger >Assignee: Chesnay Schepler > > I'm using Flink 699f4b0. > My JobManager log contains many of these log entries: > {code} > 2017-01-11 19:42:05,778 WARN > org.apache.flink.runtime.webmonitor.metrics.MetricFetcher - Fetching > metrics failed. > akka.pattern.AskTimeoutException: Ask timed out on > [Actor[akka://flink/user/MetricQueryService#-970662317]] after [1 ms] > at > akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:334) > at akka.actor.Scheduler$$anon$7.run(Scheduler.scala:117) > at > scala.concurrent.Future$InternalCallbackExecutor$.scala$concurrent$Future$InternalCallbackExecutor$$unbatchedExecute(Future.scala:694) > at > scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:691) > at > akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(Scheduler.scala:474) > at > akka.actor.LightArrayRevolverScheduler$$anon$8.executeBucket$1(Scheduler.scala:425) > at > akka.actor.LightArrayRevolverScheduler$$anon$8.nextTick(Scheduler.scala:429) > at > akka.actor.LightArrayRevolverScheduler$$anon$8.run(Scheduler.scala:381) > at java.lang.Thread.run(Thread.java:745) > 2017-01-11 19:42:07,765 WARN > org.apache.flink.runtime.metrics.dump.MetricQueryService - An exception > occurred while processing a message. > java.lang.NullPointerException > at > org.apache.flink.runtime.metrics.dump.MetricDumpSerialization.serializeGauge(MetricDumpSerialization.java:162) > at > org.apache.flink.runtime.metrics.dump.MetricDumpSerialization.access$300(MetricDumpSerialization.java:47) > at > org.apache.flink.runtime.metrics.dump.MetricDumpSerialization$MetricDumpSerializer.serialize(MetricDumpSerialization.java:90) > at > org.apache.flink.runtime.metrics.dump.MetricQueryService.onReceive(MetricQueryService.java:109) > at > akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:167) > at akka.actor.Actor$class.aroundReceive(Actor.scala:467) > at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:97) > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) > at akka.actor.ActorCell.invoke(ActorCell.scala:487) > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238) > at akka.dispatch.Mailbox.run(Mailbox.scala:220) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5464) MetricQueryService throws NullPointerException on JobManager
[ https://issues.apache.org/jira/browse/FLINK-5464?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15834507#comment-15834507 ] ASF GitHub Bot commented on FLINK-5464: --- Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/3128#discussion_r97314773 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerialization.java --- @@ -50,12 +51,27 @@ private MetricDumpSerialization() { } + public static class MetricSerializationResult { + public final byte[] data; --- End diff -- Wonderning whether to call this `serializedMetrics` > MetricQueryService throws NullPointerException on JobManager > > > Key: FLINK-5464 > URL: https://issues.apache.org/jira/browse/FLINK-5464 > Project: Flink > Issue Type: Bug > Components: Webfrontend >Affects Versions: 1.2.0 >Reporter: Robert Metzger >Assignee: Chesnay Schepler > > I'm using Flink 699f4b0. > My JobManager log contains many of these log entries: > {code} > 2017-01-11 19:42:05,778 WARN > org.apache.flink.runtime.webmonitor.metrics.MetricFetcher - Fetching > metrics failed. > akka.pattern.AskTimeoutException: Ask timed out on > [Actor[akka://flink/user/MetricQueryService#-970662317]] after [1 ms] > at > akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:334) > at akka.actor.Scheduler$$anon$7.run(Scheduler.scala:117) > at > scala.concurrent.Future$InternalCallbackExecutor$.scala$concurrent$Future$InternalCallbackExecutor$$unbatchedExecute(Future.scala:694) > at > scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:691) > at > akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(Scheduler.scala:474) > at > akka.actor.LightArrayRevolverScheduler$$anon$8.executeBucket$1(Scheduler.scala:425) > at > akka.actor.LightArrayRevolverScheduler$$anon$8.nextTick(Scheduler.scala:429) > at > akka.actor.LightArrayRevolverScheduler$$anon$8.run(Scheduler.scala:381) > at java.lang.Thread.run(Thread.java:745) > 2017-01-11 19:42:07,765 WARN > org.apache.flink.runtime.metrics.dump.MetricQueryService - An exception > occurred while processing a message. > java.lang.NullPointerException > at > org.apache.flink.runtime.metrics.dump.MetricDumpSerialization.serializeGauge(MetricDumpSerialization.java:162) > at > org.apache.flink.runtime.metrics.dump.MetricDumpSerialization.access$300(MetricDumpSerialization.java:47) > at > org.apache.flink.runtime.metrics.dump.MetricDumpSerialization$MetricDumpSerializer.serialize(MetricDumpSerialization.java:90) > at > org.apache.flink.runtime.metrics.dump.MetricQueryService.onReceive(MetricQueryService.java:109) > at > akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:167) > at akka.actor.Actor$class.aroundReceive(Actor.scala:467) > at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:97) > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) > at akka.actor.ActorCell.invoke(ActorCell.scala:487) > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238) > at akka.dispatch.Mailbox.run(Mailbox.scala:220) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5464) MetricQueryService throws NullPointerException on JobManager
[ https://issues.apache.org/jira/browse/FLINK-5464?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15834508#comment-15834508 ] ASF GitHub Bot commented on FLINK-5464: --- Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/3128#discussion_r97316854 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerialization.java --- @@ -191,62 +233,65 @@ private static void serializeMeter(DataOutputStream dos, Meter meter) throws IOE * * @param data serialized metrics * @return A list containing the deserialized metrics. -* @throws IOException */ - public List deserialize(byte[] data) throws IOException { - ByteArrayInputStream bais = new ByteArrayInputStream(data); - DataInputStream dis = new DataInputStream(bais); - int numCounters = dis.readInt(); - int numGauges = dis.readInt(); - int numHistograms = dis.readInt(); - int numMeters = dis.readInt(); + public List deserialize(MetricDumpSerialization.MetricSerializationResult data) { --- End diff -- In line 230, an empty line is missing before `class MetricDumpDeserializer`. > MetricQueryService throws NullPointerException on JobManager > > > Key: FLINK-5464 > URL: https://issues.apache.org/jira/browse/FLINK-5464 > Project: Flink > Issue Type: Bug > Components: Webfrontend >Affects Versions: 1.2.0 >Reporter: Robert Metzger >Assignee: Chesnay Schepler > > I'm using Flink 699f4b0. > My JobManager log contains many of these log entries: > {code} > 2017-01-11 19:42:05,778 WARN > org.apache.flink.runtime.webmonitor.metrics.MetricFetcher - Fetching > metrics failed. > akka.pattern.AskTimeoutException: Ask timed out on > [Actor[akka://flink/user/MetricQueryService#-970662317]] after [1 ms] > at > akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:334) > at akka.actor.Scheduler$$anon$7.run(Scheduler.scala:117) > at > scala.concurrent.Future$InternalCallbackExecutor$.scala$concurrent$Future$InternalCallbackExecutor$$unbatchedExecute(Future.scala:694) > at > scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:691) > at > akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(Scheduler.scala:474) > at > akka.actor.LightArrayRevolverScheduler$$anon$8.executeBucket$1(Scheduler.scala:425) > at > akka.actor.LightArrayRevolverScheduler$$anon$8.nextTick(Scheduler.scala:429) > at > akka.actor.LightArrayRevolverScheduler$$anon$8.run(Scheduler.scala:381) > at java.lang.Thread.run(Thread.java:745) > 2017-01-11 19:42:07,765 WARN > org.apache.flink.runtime.metrics.dump.MetricQueryService - An exception > occurred while processing a message. > java.lang.NullPointerException > at > org.apache.flink.runtime.metrics.dump.MetricDumpSerialization.serializeGauge(MetricDumpSerialization.java:162) > at > org.apache.flink.runtime.metrics.dump.MetricDumpSerialization.access$300(MetricDumpSerialization.java:47) > at > org.apache.flink.runtime.metrics.dump.MetricDumpSerialization$MetricDumpSerializer.serialize(MetricDumpSerialization.java:90) > at > org.apache.flink.runtime.metrics.dump.MetricQueryService.onReceive(MetricQueryService.java:109) > at > akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:167) > at akka.actor.Actor$class.aroundReceive(Actor.scala:467) > at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:97) > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) > at akka.actor.ActorCell.invoke(ActorCell.scala:487) > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238) > at akka.dispatch.Mailbox.run(Mailbox.scala:220) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5464) MetricQueryService throws NullPointerException on JobManager
[ https://issues.apache.org/jira/browse/FLINK-5464?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15834511#comment-15834511 ] ASF GitHub Bot commented on FLINK-5464: --- Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/3128#discussion_r97317434 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerialization.java --- @@ -191,62 +233,65 @@ private static void serializeMeter(DataOutputStream dos, Meter meter) throws IOE * * @param data serialized metrics * @return A list containing the deserialized metrics. -* @throws IOException */ - public List deserialize(byte[] data) throws IOException { - ByteArrayInputStream bais = new ByteArrayInputStream(data); - DataInputStream dis = new DataInputStream(bais); - int numCounters = dis.readInt(); - int numGauges = dis.readInt(); - int numHistograms = dis.readInt(); - int numMeters = dis.readInt(); + public List deserialize(MetricDumpSerialization.MetricSerializationResult data) { + DataInputView in = new DataInputDeserializer(data.data, 0, data.data.length); - List metrics = new ArrayList<>(numCounters + numGauges + numHistograms); + List metrics = new ArrayList<>(data.numCounters + data.numGauges + data.numHistograms + data.numMeters); - for (int x = 0; x < numCounters; x++) { - metrics.add(deserializeCounter(dis)); + for (int x = 0; x < data.numCounters; x++) { + try { + metrics.add(deserializeCounter(in)); + } catch (Exception e) { + LOG.warn("Failed to deserialize counter.", e); + } } - for (int x = 0; x < numGauges; x++) { - metrics.add(deserializeGauge(dis)); + for (int x = 0; x < data.numGauges; x++) { + try { + metrics.add(deserializeGauge(in)); + } catch (Exception e) { + LOG.warn("Failed to deserialize counter.", e); + } } - for (int x = 0; x < numHistograms; x++) { - metrics.add(deserializeHistogram(dis)); + for (int x = 0; x < data.numHistograms; x++) { + try { + metrics.add(deserializeHistogram(in)); + } catch (Exception e) { + LOG.warn("Failed to deserialize counter.", e); + } } - for (int x = 0; x < numMeters; x++) { - metrics.add(deserializeMeter(dis)); + for (int x = 0; x < data.numMeters; x++) { + try { + metrics.add(deserializeMeter(in)); + } catch (Exception e) { + LOG.warn("Failed to deserialize counter.", e); + } } - - return metrics; --- End diff -- Code does not compile because of the missing return. > MetricQueryService throws NullPointerException on JobManager > > > Key: FLINK-5464 > URL: https://issues.apache.org/jira/browse/FLINK-5464 > Project: Flink > Issue Type: Bug > Components: Webfrontend >Affects Versions: 1.2.0 >Reporter: Robert Metzger >Assignee: Chesnay Schepler > > I'm using Flink 699f4b0. > My JobManager log contains many of these log entries: > {code} > 2017-01-11 19:42:05,778 WARN > org.apache.flink.runtime.webmonitor.metrics.MetricFetcher - Fetching > metrics failed. > akka.pattern.AskTimeoutException: Ask timed out on > [Actor[akka://flink/user/MetricQueryService#-970662317]] after [1 ms] > at > akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:334) > at akka.actor.Scheduler$$anon$7.run(Scheduler.scala:117) > at >
[jira] [Commented] (FLINK-5464) MetricQueryService throws NullPointerException on JobManager
[ https://issues.apache.org/jira/browse/FLINK-5464?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15834514#comment-15834514 ] ASF GitHub Bot commented on FLINK-5464: --- Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/3128#discussion_r97314569 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerialization.java --- @@ -17,19 +17,20 @@ */ package org.apache.flink.runtime.metrics.dump; -import org.apache.commons.io.output.ByteArrayOutputStream; import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.core.memory.DataInputView; import org.apache.flink.metrics.Counter; import org.apache.flink.metrics.Gauge; import org.apache.flink.metrics.Histogram; import org.apache.flink.metrics.HistogramStatistics; import org.apache.flink.metrics.Meter; +import org.apache.flink.runtime.util.DataInputDeserializer; +import org.apache.flink.runtime.util.DataOutputSerializer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.ByteArrayInputStream; -import java.io.DataInputStream; -import java.io.DataOutputStream; +import java.io.DataInput; +import java.io.DataOutput; import java.io.IOException; import java.util.ArrayList; import java.util.List; --- End diff -- In line 49 (above the `LOG` field) an empty line is missing > MetricQueryService throws NullPointerException on JobManager > > > Key: FLINK-5464 > URL: https://issues.apache.org/jira/browse/FLINK-5464 > Project: Flink > Issue Type: Bug > Components: Webfrontend >Affects Versions: 1.2.0 >Reporter: Robert Metzger >Assignee: Chesnay Schepler > > I'm using Flink 699f4b0. > My JobManager log contains many of these log entries: > {code} > 2017-01-11 19:42:05,778 WARN > org.apache.flink.runtime.webmonitor.metrics.MetricFetcher - Fetching > metrics failed. > akka.pattern.AskTimeoutException: Ask timed out on > [Actor[akka://flink/user/MetricQueryService#-970662317]] after [1 ms] > at > akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:334) > at akka.actor.Scheduler$$anon$7.run(Scheduler.scala:117) > at > scala.concurrent.Future$InternalCallbackExecutor$.scala$concurrent$Future$InternalCallbackExecutor$$unbatchedExecute(Future.scala:694) > at > scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:691) > at > akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(Scheduler.scala:474) > at > akka.actor.LightArrayRevolverScheduler$$anon$8.executeBucket$1(Scheduler.scala:425) > at > akka.actor.LightArrayRevolverScheduler$$anon$8.nextTick(Scheduler.scala:429) > at > akka.actor.LightArrayRevolverScheduler$$anon$8.run(Scheduler.scala:381) > at java.lang.Thread.run(Thread.java:745) > 2017-01-11 19:42:07,765 WARN > org.apache.flink.runtime.metrics.dump.MetricQueryService - An exception > occurred while processing a message. > java.lang.NullPointerException > at > org.apache.flink.runtime.metrics.dump.MetricDumpSerialization.serializeGauge(MetricDumpSerialization.java:162) > at > org.apache.flink.runtime.metrics.dump.MetricDumpSerialization.access$300(MetricDumpSerialization.java:47) > at > org.apache.flink.runtime.metrics.dump.MetricDumpSerialization$MetricDumpSerializer.serialize(MetricDumpSerialization.java:90) > at > org.apache.flink.runtime.metrics.dump.MetricQueryService.onReceive(MetricQueryService.java:109) > at > akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:167) > at akka.actor.Actor$class.aroundReceive(Actor.scala:467) > at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:97) > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) > at akka.actor.ActorCell.invoke(ActorCell.scala:487) > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238) > at akka.dispatch.Mailbox.run(Mailbox.scala:220) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5464) MetricQueryService throws NullPointerException on JobManager
[ https://issues.apache.org/jira/browse/FLINK-5464?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15834517#comment-15834517 ] ASF GitHub Bot commented on FLINK-5464: --- Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/3128#discussion_r97315205 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerialization.java --- @@ -50,12 +51,27 @@ private MetricDumpSerialization() { } + public static class MetricSerializationResult { + public final byte[] data; + public final int numCounters; + public final int numGauges; + public final int numMeters; + public final int numHistograms; + + public MetricSerializationResult(byte[] data, int numCounters, int numGauges, int numMeters, int numHistograms) { + this.data = data; + this.numCounters = numCounters; + this.numGauges = numGauges; + this.numMeters = numMeters; + this.numHistograms = numHistograms; + } + } + //- // Serialization //- public static class MetricDumpSerializer { - private ByteArrayOutputStream baos = new ByteArrayOutputStream(4096); - private DataOutputStream dos = new DataOutputStream(baos); + private DataOutputSerializer buffer = new DataOutputSerializer(1024 * 32); --- End diff -- Empty line missing before this line. > MetricQueryService throws NullPointerException on JobManager > > > Key: FLINK-5464 > URL: https://issues.apache.org/jira/browse/FLINK-5464 > Project: Flink > Issue Type: Bug > Components: Webfrontend >Affects Versions: 1.2.0 >Reporter: Robert Metzger >Assignee: Chesnay Schepler > > I'm using Flink 699f4b0. > My JobManager log contains many of these log entries: > {code} > 2017-01-11 19:42:05,778 WARN > org.apache.flink.runtime.webmonitor.metrics.MetricFetcher - Fetching > metrics failed. > akka.pattern.AskTimeoutException: Ask timed out on > [Actor[akka://flink/user/MetricQueryService#-970662317]] after [1 ms] > at > akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:334) > at akka.actor.Scheduler$$anon$7.run(Scheduler.scala:117) > at > scala.concurrent.Future$InternalCallbackExecutor$.scala$concurrent$Future$InternalCallbackExecutor$$unbatchedExecute(Future.scala:694) > at > scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:691) > at > akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(Scheduler.scala:474) > at > akka.actor.LightArrayRevolverScheduler$$anon$8.executeBucket$1(Scheduler.scala:425) > at > akka.actor.LightArrayRevolverScheduler$$anon$8.nextTick(Scheduler.scala:429) > at > akka.actor.LightArrayRevolverScheduler$$anon$8.run(Scheduler.scala:381) > at java.lang.Thread.run(Thread.java:745) > 2017-01-11 19:42:07,765 WARN > org.apache.flink.runtime.metrics.dump.MetricQueryService - An exception > occurred while processing a message. > java.lang.NullPointerException > at > org.apache.flink.runtime.metrics.dump.MetricDumpSerialization.serializeGauge(MetricDumpSerialization.java:162) > at > org.apache.flink.runtime.metrics.dump.MetricDumpSerialization.access$300(MetricDumpSerialization.java:47) > at > org.apache.flink.runtime.metrics.dump.MetricDumpSerialization$MetricDumpSerializer.serialize(MetricDumpSerialization.java:90) > at > org.apache.flink.runtime.metrics.dump.MetricQueryService.onReceive(MetricQueryService.java:109) > at > akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:167) > at akka.actor.Actor$class.aroundReceive(Actor.scala:467) > at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:97) > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) > at akka.actor.ActorCell.invoke(ActorCell.scala:487) > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238) > at akka.dispatch.Mailbox.run(Mailbox.scala:220) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at
[jira] [Commented] (FLINK-5464) MetricQueryService throws NullPointerException on JobManager
[ https://issues.apache.org/jira/browse/FLINK-5464?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15834513#comment-15834513 ] ASF GitHub Bot commented on FLINK-5464: --- Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/3128#discussion_r97315979 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerialization.java --- @@ -64,122 +80,148 @@ private MetricDumpSerialization() { * @param gauges gauges to serialize * @param histograms histograms to serialize * @return byte array containing the serialized metrics -* @throws IOException */ - public byte[] serialize( + public MetricSerializationResult serialize( Map> counters, Map > gauges, Map > histograms, - Map > meters) throws IOException { - - baos.reset(); - dos.writeInt(counters.size()); - dos.writeInt(gauges.size()); - dos.writeInt(histograms.size()); - dos.writeInt(meters.size()); + Map > meters) { + + buffer.clear(); + int numCounters = 0; for (Map.Entry > entry : counters.entrySet()) { - serializeMetricInfo(dos, entry.getValue().f0); - serializeString(dos, entry.getValue().f1); - serializeCounter(dos, entry.getKey()); + try { + serializeCounter(buffer, entry.getValue().f0, entry.getValue().f1, entry.getKey()); + numCounters++; + } catch (Exception e) { + LOG.warn("Failed to serialize counter.", e); + } } + int numGauges = 0; for (Map.Entry > entry : gauges.entrySet()) { - serializeMetricInfo(dos, entry.getValue().f0); - serializeString(dos, entry.getValue().f1); - serializeGauge(dos, entry.getKey()); + try { + serializeGauge(buffer, entry.getValue().f0, entry.getValue().f1, entry.getKey()); + numGauges++; + } catch (Exception e) { + LOG.warn("Failed to serialize gauge.", e); + } } + int numHistograms = 0; for (Map.Entry > entry : histograms.entrySet()) { - serializeMetricInfo(dos, entry.getValue().f0); - serializeString(dos, entry.getValue().f1); - serializeHistogram(dos, entry.getKey()); + try { + serializeHistogram(buffer, entry.getValue().f0, entry.getValue().f1, entry.getKey()); + numHistograms++; + } catch (Exception e) { + LOG.warn("Failed to serialize histogram.", e); + } } + int numMeters = 0; for (Map.Entry > entry : meters.entrySet()) { - serializeMetricInfo(dos, entry.getValue().f0); - serializeString(dos, entry.getValue().f1); - serializeMeter(dos, entry.getKey()); + try { + serializeMeter(buffer, entry.getValue().f0, entry.getValue().f1, entry.getKey()); + numMeters++; + } catch (Exception e) { + LOG.warn("Failed to serialize meter.", e); + } } - return baos.toByteArray(); + return new MetricSerializationResult(buffer.getCopyOfBuffer(),
[jira] [Commented] (FLINK-5464) MetricQueryService throws NullPointerException on JobManager
[ https://issues.apache.org/jira/browse/FLINK-5464?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15834509#comment-15834509 ] ASF GitHub Bot commented on FLINK-5464: --- Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/3128#discussion_r97315450 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerialization.java --- @@ -64,122 +80,148 @@ private MetricDumpSerialization() { * @param gauges gauges to serialize * @param histograms histograms to serialize * @return byte array containing the serialized metrics -* @throws IOException */ - public byte[] serialize( + public MetricSerializationResult serialize( Map> counters, Map > gauges, Map > histograms, - Map > meters) throws IOException { - - baos.reset(); - dos.writeInt(counters.size()); - dos.writeInt(gauges.size()); - dos.writeInt(histograms.size()); - dos.writeInt(meters.size()); + Map > meters) { + + buffer.clear(); + int numCounters = 0; for (Map.Entry > entry : counters.entrySet()) { - serializeMetricInfo(dos, entry.getValue().f0); - serializeString(dos, entry.getValue().f1); - serializeCounter(dos, entry.getKey()); + try { + serializeCounter(buffer, entry.getValue().f0, entry.getValue().f1, entry.getKey()); + numCounters++; + } catch (Exception e) { + LOG.warn("Failed to serialize counter.", e); + } } + int numGauges = 0; for (Map.Entry > entry : gauges.entrySet()) { - serializeMetricInfo(dos, entry.getValue().f0); - serializeString(dos, entry.getValue().f1); - serializeGauge(dos, entry.getKey()); + try { + serializeGauge(buffer, entry.getValue().f0, entry.getValue().f1, entry.getKey()); + numGauges++; + } catch (Exception e) { + LOG.warn("Failed to serialize gauge.", e); + } } + int numHistograms = 0; for (Map.Entry > entry : histograms.entrySet()) { - serializeMetricInfo(dos, entry.getValue().f0); - serializeString(dos, entry.getValue().f1); - serializeHistogram(dos, entry.getKey()); + try { + serializeHistogram(buffer, entry.getValue().f0, entry.getValue().f1, entry.getKey()); + numHistograms++; + } catch (Exception e) { + LOG.warn("Failed to serialize histogram.", e); + } } + int numMeters = 0; for (Map.Entry > entry : meters.entrySet()) { - serializeMetricInfo(dos, entry.getValue().f0); - serializeString(dos, entry.getValue().f1); - serializeMeter(dos, entry.getKey()); + try { + serializeMeter(buffer, entry.getValue().f0, entry.getValue().f1, entry.getKey()); + numMeters++; + } catch (Exception e) { + LOG.warn("Failed to serialize meter.", e); + } } - return baos.toByteArray(); + return new MetricSerializationResult(buffer.getCopyOfBuffer(),
[jira] [Commented] (FLINK-5464) MetricQueryService throws NullPointerException on JobManager
[ https://issues.apache.org/jira/browse/FLINK-5464?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15834520#comment-15834520 ] ASF GitHub Bot commented on FLINK-5464: --- Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/3128#discussion_r97316717 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerialization.java --- @@ -50,12 +51,27 @@ private MetricDumpSerialization() { } + public static class MetricSerializationResult { + public final byte[] data; + public final int numCounters; + public final int numGauges; + public final int numMeters; + public final int numHistograms; + + public MetricSerializationResult(byte[] data, int numCounters, int numGauges, int numMeters, int numHistograms) { --- End diff -- Let's decrease the visibility of the constructor as much as possible. > MetricQueryService throws NullPointerException on JobManager > > > Key: FLINK-5464 > URL: https://issues.apache.org/jira/browse/FLINK-5464 > Project: Flink > Issue Type: Bug > Components: Webfrontend >Affects Versions: 1.2.0 >Reporter: Robert Metzger >Assignee: Chesnay Schepler > > I'm using Flink 699f4b0. > My JobManager log contains many of these log entries: > {code} > 2017-01-11 19:42:05,778 WARN > org.apache.flink.runtime.webmonitor.metrics.MetricFetcher - Fetching > metrics failed. > akka.pattern.AskTimeoutException: Ask timed out on > [Actor[akka://flink/user/MetricQueryService#-970662317]] after [1 ms] > at > akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:334) > at akka.actor.Scheduler$$anon$7.run(Scheduler.scala:117) > at > scala.concurrent.Future$InternalCallbackExecutor$.scala$concurrent$Future$InternalCallbackExecutor$$unbatchedExecute(Future.scala:694) > at > scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:691) > at > akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(Scheduler.scala:474) > at > akka.actor.LightArrayRevolverScheduler$$anon$8.executeBucket$1(Scheduler.scala:425) > at > akka.actor.LightArrayRevolverScheduler$$anon$8.nextTick(Scheduler.scala:429) > at > akka.actor.LightArrayRevolverScheduler$$anon$8.run(Scheduler.scala:381) > at java.lang.Thread.run(Thread.java:745) > 2017-01-11 19:42:07,765 WARN > org.apache.flink.runtime.metrics.dump.MetricQueryService - An exception > occurred while processing a message. > java.lang.NullPointerException > at > org.apache.flink.runtime.metrics.dump.MetricDumpSerialization.serializeGauge(MetricDumpSerialization.java:162) > at > org.apache.flink.runtime.metrics.dump.MetricDumpSerialization.access$300(MetricDumpSerialization.java:47) > at > org.apache.flink.runtime.metrics.dump.MetricDumpSerialization$MetricDumpSerializer.serialize(MetricDumpSerialization.java:90) > at > org.apache.flink.runtime.metrics.dump.MetricQueryService.onReceive(MetricQueryService.java:109) > at > akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:167) > at akka.actor.Actor$class.aroundReceive(Actor.scala:467) > at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:97) > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) > at akka.actor.ActorCell.invoke(ActorCell.scala:487) > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238) > at akka.dispatch.Mailbox.run(Mailbox.scala:220) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5464) MetricQueryService throws NullPointerException on JobManager
[ https://issues.apache.org/jira/browse/FLINK-5464?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15834516#comment-15834516 ] ASF GitHub Bot commented on FLINK-5464: --- Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/3128#discussion_r97317481 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerialization.java --- @@ -191,62 +233,65 @@ private static void serializeMeter(DataOutputStream dos, Meter meter) throws IOE * * @param data serialized metrics * @return A list containing the deserialized metrics. -* @throws IOException */ - public List deserialize(byte[] data) throws IOException { - ByteArrayInputStream bais = new ByteArrayInputStream(data); - DataInputStream dis = new DataInputStream(bais); - int numCounters = dis.readInt(); - int numGauges = dis.readInt(); - int numHistograms = dis.readInt(); - int numMeters = dis.readInt(); + public List deserialize(MetricDumpSerialization.MetricSerializationResult data) { + DataInputView in = new DataInputDeserializer(data.data, 0, data.data.length); - List metrics = new ArrayList<>(numCounters + numGauges + numHistograms); + List metrics = new ArrayList<>(data.numCounters + data.numGauges + data.numHistograms + data.numMeters); - for (int x = 0; x < numCounters; x++) { - metrics.add(deserializeCounter(dis)); + for (int x = 0; x < data.numCounters; x++) { + try { + metrics.add(deserializeCounter(in)); + } catch (Exception e) { + LOG.warn("Failed to deserialize counter.", e); + } } - for (int x = 0; x < numGauges; x++) { - metrics.add(deserializeGauge(dis)); + for (int x = 0; x < data.numGauges; x++) { + try { + metrics.add(deserializeGauge(in)); + } catch (Exception e) { + LOG.warn("Failed to deserialize counter.", e); + } } - for (int x = 0; x < numHistograms; x++) { - metrics.add(deserializeHistogram(dis)); + for (int x = 0; x < data.numHistograms; x++) { + try { + metrics.add(deserializeHistogram(in)); + } catch (Exception e) { + LOG.warn("Failed to deserialize counter.", e); + } } - for (int x = 0; x < numMeters; x++) { - metrics.add(deserializeMeter(dis)); + for (int x = 0; x < data.numMeters; x++) { + try { + metrics.add(deserializeMeter(in)); + } catch (Exception e) { + LOG.warn("Failed to deserialize counter.", e); + } } - - return metrics; } } - private static String deserializeString(DataInputStream dis) throws IOException { - int stringLength = dis.readInt(); - byte[] bytes = new byte[stringLength]; - dis.readFully(bytes); - return new String(bytes); - } - private static MetricDump.CounterDump deserializeCounter(DataInputStream dis) throws IOException { + private static MetricDump.CounterDump deserializeCounter(DataInputView dis) throws IOException { QueryScopeInfo scope = deserializeMetricInfo(dis); - String name = deserializeString(dis); - return new MetricDump.CounterDump(scope, name, dis.readLong()); + String name = dis.readUTF(); + long count = dis.readLong(); + return new MetricDump.CounterDump(scope, name, count); --- End diff -- Should we add an empty line before the `return`s? > MetricQueryService throws NullPointerException on JobManager > > > Key: FLINK-5464 >
[jira] [Commented] (FLINK-5464) MetricQueryService throws NullPointerException on JobManager
[ https://issues.apache.org/jira/browse/FLINK-5464?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15834519#comment-15834519 ] ASF GitHub Bot commented on FLINK-5464: --- Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/3128#discussion_r97314949 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerialization.java --- @@ -50,12 +51,27 @@ private MetricDumpSerialization() { } + public static class MetricSerializationResult { --- End diff -- Can you add a short class level comment about why we added this? The problem with determining number of metrics before hand etc. > MetricQueryService throws NullPointerException on JobManager > > > Key: FLINK-5464 > URL: https://issues.apache.org/jira/browse/FLINK-5464 > Project: Flink > Issue Type: Bug > Components: Webfrontend >Affects Versions: 1.2.0 >Reporter: Robert Metzger >Assignee: Chesnay Schepler > > I'm using Flink 699f4b0. > My JobManager log contains many of these log entries: > {code} > 2017-01-11 19:42:05,778 WARN > org.apache.flink.runtime.webmonitor.metrics.MetricFetcher - Fetching > metrics failed. > akka.pattern.AskTimeoutException: Ask timed out on > [Actor[akka://flink/user/MetricQueryService#-970662317]] after [1 ms] > at > akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:334) > at akka.actor.Scheduler$$anon$7.run(Scheduler.scala:117) > at > scala.concurrent.Future$InternalCallbackExecutor$.scala$concurrent$Future$InternalCallbackExecutor$$unbatchedExecute(Future.scala:694) > at > scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:691) > at > akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(Scheduler.scala:474) > at > akka.actor.LightArrayRevolverScheduler$$anon$8.executeBucket$1(Scheduler.scala:425) > at > akka.actor.LightArrayRevolverScheduler$$anon$8.nextTick(Scheduler.scala:429) > at > akka.actor.LightArrayRevolverScheduler$$anon$8.run(Scheduler.scala:381) > at java.lang.Thread.run(Thread.java:745) > 2017-01-11 19:42:07,765 WARN > org.apache.flink.runtime.metrics.dump.MetricQueryService - An exception > occurred while processing a message. > java.lang.NullPointerException > at > org.apache.flink.runtime.metrics.dump.MetricDumpSerialization.serializeGauge(MetricDumpSerialization.java:162) > at > org.apache.flink.runtime.metrics.dump.MetricDumpSerialization.access$300(MetricDumpSerialization.java:47) > at > org.apache.flink.runtime.metrics.dump.MetricDumpSerialization$MetricDumpSerializer.serialize(MetricDumpSerialization.java:90) > at > org.apache.flink.runtime.metrics.dump.MetricQueryService.onReceive(MetricQueryService.java:109) > at > akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:167) > at akka.actor.Actor$class.aroundReceive(Actor.scala:467) > at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:97) > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) > at akka.actor.ActorCell.invoke(ActorCell.scala:487) > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238) > at akka.dispatch.Mailbox.run(Mailbox.scala:220) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5464) MetricQueryService throws NullPointerException on JobManager
[ https://issues.apache.org/jira/browse/FLINK-5464?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15834512#comment-15834512 ] ASF GitHub Bot commented on FLINK-5464: --- Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/3128#discussion_r97316605 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerialization.java --- @@ -64,122 +80,148 @@ private MetricDumpSerialization() { * @param gauges gauges to serialize * @param histograms histograms to serialize * @return byte array containing the serialized metrics -* @throws IOException */ - public byte[] serialize( + public MetricSerializationResult serialize( Map> counters, Map > gauges, Map > histograms, - Map > meters) throws IOException { - - baos.reset(); - dos.writeInt(counters.size()); - dos.writeInt(gauges.size()); - dos.writeInt(histograms.size()); - dos.writeInt(meters.size()); + Map > meters) { + + buffer.clear(); + int numCounters = 0; for (Map.Entry > entry : counters.entrySet()) { - serializeMetricInfo(dos, entry.getValue().f0); - serializeString(dos, entry.getValue().f1); - serializeCounter(dos, entry.getKey()); + try { + serializeCounter(buffer, entry.getValue().f0, entry.getValue().f1, entry.getKey()); + numCounters++; + } catch (Exception e) { + LOG.warn("Failed to serialize counter.", e); + } } + int numGauges = 0; for (Map.Entry > entry : gauges.entrySet()) { - serializeMetricInfo(dos, entry.getValue().f0); - serializeString(dos, entry.getValue().f1); - serializeGauge(dos, entry.getKey()); + try { + serializeGauge(buffer, entry.getValue().f0, entry.getValue().f1, entry.getKey()); + numGauges++; + } catch (Exception e) { + LOG.warn("Failed to serialize gauge.", e); + } } + int numHistograms = 0; for (Map.Entry > entry : histograms.entrySet()) { - serializeMetricInfo(dos, entry.getValue().f0); - serializeString(dos, entry.getValue().f1); - serializeHistogram(dos, entry.getKey()); + try { + serializeHistogram(buffer, entry.getValue().f0, entry.getValue().f1, entry.getKey()); + numHistograms++; + } catch (Exception e) { + LOG.warn("Failed to serialize histogram.", e); + } } + int numMeters = 0; for (Map.Entry > entry : meters.entrySet()) { - serializeMetricInfo(dos, entry.getValue().f0); - serializeString(dos, entry.getValue().f1); - serializeMeter(dos, entry.getKey()); + try { + serializeMeter(buffer, entry.getValue().f0, entry.getValue().f1, entry.getKey()); + numMeters++; + } catch (Exception e) { + LOG.warn("Failed to serialize meter.", e); --- End diff -- Should we decrease the log level to `debug` (here and the other lines)? The user won't be able to act on the warning and in most cases this should work and we don't
[jira] [Commented] (FLINK-5464) MetricQueryService throws NullPointerException on JobManager
[ https://issues.apache.org/jira/browse/FLINK-5464?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15834515#comment-15834515 ] ASF GitHub Bot commented on FLINK-5464: --- Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/3128#discussion_r97316951 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerialization.java --- @@ -191,62 +233,65 @@ private static void serializeMeter(DataOutputStream dos, Meter meter) throws IOE * * @param data serialized metrics * @return A list containing the deserialized metrics. -* @throws IOException */ - public List deserialize(byte[] data) throws IOException { - ByteArrayInputStream bais = new ByteArrayInputStream(data); - DataInputStream dis = new DataInputStream(bais); - int numCounters = dis.readInt(); - int numGauges = dis.readInt(); - int numHistograms = dis.readInt(); - int numMeters = dis.readInt(); + public List deserialize(MetricDumpSerialization.MetricSerializationResult data) { --- End diff -- Furthermore, after that line. And there is an empty line after the JavaDoc of `deserialize`. > MetricQueryService throws NullPointerException on JobManager > > > Key: FLINK-5464 > URL: https://issues.apache.org/jira/browse/FLINK-5464 > Project: Flink > Issue Type: Bug > Components: Webfrontend >Affects Versions: 1.2.0 >Reporter: Robert Metzger >Assignee: Chesnay Schepler > > I'm using Flink 699f4b0. > My JobManager log contains many of these log entries: > {code} > 2017-01-11 19:42:05,778 WARN > org.apache.flink.runtime.webmonitor.metrics.MetricFetcher - Fetching > metrics failed. > akka.pattern.AskTimeoutException: Ask timed out on > [Actor[akka://flink/user/MetricQueryService#-970662317]] after [1 ms] > at > akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:334) > at akka.actor.Scheduler$$anon$7.run(Scheduler.scala:117) > at > scala.concurrent.Future$InternalCallbackExecutor$.scala$concurrent$Future$InternalCallbackExecutor$$unbatchedExecute(Future.scala:694) > at > scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:691) > at > akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(Scheduler.scala:474) > at > akka.actor.LightArrayRevolverScheduler$$anon$8.executeBucket$1(Scheduler.scala:425) > at > akka.actor.LightArrayRevolverScheduler$$anon$8.nextTick(Scheduler.scala:429) > at > akka.actor.LightArrayRevolverScheduler$$anon$8.run(Scheduler.scala:381) > at java.lang.Thread.run(Thread.java:745) > 2017-01-11 19:42:07,765 WARN > org.apache.flink.runtime.metrics.dump.MetricQueryService - An exception > occurred while processing a message. > java.lang.NullPointerException > at > org.apache.flink.runtime.metrics.dump.MetricDumpSerialization.serializeGauge(MetricDumpSerialization.java:162) > at > org.apache.flink.runtime.metrics.dump.MetricDumpSerialization.access$300(MetricDumpSerialization.java:47) > at > org.apache.flink.runtime.metrics.dump.MetricDumpSerialization$MetricDumpSerializer.serialize(MetricDumpSerialization.java:90) > at > org.apache.flink.runtime.metrics.dump.MetricQueryService.onReceive(MetricQueryService.java:109) > at > akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:167) > at akka.actor.Actor$class.aroundReceive(Actor.scala:467) > at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:97) > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) > at akka.actor.ActorCell.invoke(ActorCell.scala:487) > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238) > at akka.dispatch.Mailbox.run(Mailbox.scala:220) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5464) MetricQueryService throws NullPointerException on JobManager
[ https://issues.apache.org/jira/browse/FLINK-5464?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15834518#comment-15834518 ] ASF GitHub Bot commented on FLINK-5464: --- Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/3128#discussion_r97319035 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerialization.java --- @@ -50,12 +51,27 @@ private MetricDumpSerialization() { } + public static class MetricSerializationResult { --- End diff -- This is not `Serializable` and would fail when send with Akka. The following test fails: ```java @Test public void testJavaSerialization() throws IOException { MetricDumpSerialization.MetricDumpSerializer serializer = new MetricDumpSerialization.MetricDumpSerializer(); final ByteArrayOutputStream bos = new ByteArrayOutputStream(1024); final ObjectOutputStream oos = new ObjectOutputStream(bos); oos.writeObject(serializer.serialize( new HashMap>(), new HashMap >(), new HashMap >(), new HashMap >())); } ``` > MetricQueryService throws NullPointerException on JobManager > > > Key: FLINK-5464 > URL: https://issues.apache.org/jira/browse/FLINK-5464 > Project: Flink > Issue Type: Bug > Components: Webfrontend >Affects Versions: 1.2.0 >Reporter: Robert Metzger >Assignee: Chesnay Schepler > > I'm using Flink 699f4b0. > My JobManager log contains many of these log entries: > {code} > 2017-01-11 19:42:05,778 WARN > org.apache.flink.runtime.webmonitor.metrics.MetricFetcher - Fetching > metrics failed. > akka.pattern.AskTimeoutException: Ask timed out on > [Actor[akka://flink/user/MetricQueryService#-970662317]] after [1 ms] > at > akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:334) > at akka.actor.Scheduler$$anon$7.run(Scheduler.scala:117) > at > scala.concurrent.Future$InternalCallbackExecutor$.scala$concurrent$Future$InternalCallbackExecutor$$unbatchedExecute(Future.scala:694) > at > scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:691) > at > akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(Scheduler.scala:474) > at > akka.actor.LightArrayRevolverScheduler$$anon$8.executeBucket$1(Scheduler.scala:425) > at > akka.actor.LightArrayRevolverScheduler$$anon$8.nextTick(Scheduler.scala:429) > at > akka.actor.LightArrayRevolverScheduler$$anon$8.run(Scheduler.scala:381) > at java.lang.Thread.run(Thread.java:745) > 2017-01-11 19:42:07,765 WARN > org.apache.flink.runtime.metrics.dump.MetricQueryService - An exception > occurred while processing a message. > java.lang.NullPointerException > at > org.apache.flink.runtime.metrics.dump.MetricDumpSerialization.serializeGauge(MetricDumpSerialization.java:162) > at > org.apache.flink.runtime.metrics.dump.MetricDumpSerialization.access$300(MetricDumpSerialization.java:47) > at > org.apache.flink.runtime.metrics.dump.MetricDumpSerialization$MetricDumpSerializer.serialize(MetricDumpSerialization.java:90) > at > org.apache.flink.runtime.metrics.dump.MetricQueryService.onReceive(MetricQueryService.java:109) > at > akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:167) > at akka.actor.Actor$class.aroundReceive(Actor.scala:467) > at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:97) > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) > at akka.actor.ActorCell.invoke(ActorCell.scala:487) > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238) > at akka.dispatch.Mailbox.run(Mailbox.scala:220) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5464) MetricQueryService throws NullPointerException on JobManager
[ https://issues.apache.org/jira/browse/FLINK-5464?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15828361#comment-15828361 ] ASF GitHub Bot commented on FLINK-5464: --- Github user zentol commented on the issue: https://github.com/apache/flink/pull/3128 I've had an offline chat with @rmetzger and @uce. We agreed that using a ByteBuffer and resizing it manually was a bit undesirable. Instead we opted for the following approach: * use DataOutputSerializer instead of DataOutputStream; it is a bit more efficient of strings, which make up the majority of serialized data, and is also backed by a resizing array * restructure the serialize methods to be symmetric with the deserialize methods * Access the metric values before serializing anything and reduce them to primitives or strings. The assumption is that if this succeeds the following serialization will succeed; and can only fail due to critical errors that will prevent serialization completely or programming errors on our part. > MetricQueryService throws NullPointerException on JobManager > > > Key: FLINK-5464 > URL: https://issues.apache.org/jira/browse/FLINK-5464 > Project: Flink > Issue Type: Bug > Components: Webfrontend >Affects Versions: 1.2.0 >Reporter: Robert Metzger >Assignee: Chesnay Schepler > > I'm using Flink 699f4b0. > My JobManager log contains many of these log entries: > {code} > 2017-01-11 19:42:05,778 WARN > org.apache.flink.runtime.webmonitor.metrics.MetricFetcher - Fetching > metrics failed. > akka.pattern.AskTimeoutException: Ask timed out on > [Actor[akka://flink/user/MetricQueryService#-970662317]] after [1 ms] > at > akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:334) > at akka.actor.Scheduler$$anon$7.run(Scheduler.scala:117) > at > scala.concurrent.Future$InternalCallbackExecutor$.scala$concurrent$Future$InternalCallbackExecutor$$unbatchedExecute(Future.scala:694) > at > scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:691) > at > akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(Scheduler.scala:474) > at > akka.actor.LightArrayRevolverScheduler$$anon$8.executeBucket$1(Scheduler.scala:425) > at > akka.actor.LightArrayRevolverScheduler$$anon$8.nextTick(Scheduler.scala:429) > at > akka.actor.LightArrayRevolverScheduler$$anon$8.run(Scheduler.scala:381) > at java.lang.Thread.run(Thread.java:745) > 2017-01-11 19:42:07,765 WARN > org.apache.flink.runtime.metrics.dump.MetricQueryService - An exception > occurred while processing a message. > java.lang.NullPointerException > at > org.apache.flink.runtime.metrics.dump.MetricDumpSerialization.serializeGauge(MetricDumpSerialization.java:162) > at > org.apache.flink.runtime.metrics.dump.MetricDumpSerialization.access$300(MetricDumpSerialization.java:47) > at > org.apache.flink.runtime.metrics.dump.MetricDumpSerialization$MetricDumpSerializer.serialize(MetricDumpSerialization.java:90) > at > org.apache.flink.runtime.metrics.dump.MetricQueryService.onReceive(MetricQueryService.java:109) > at > akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:167) > at akka.actor.Actor$class.aroundReceive(Actor.scala:467) > at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:97) > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) > at akka.actor.ActorCell.invoke(ActorCell.scala:487) > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238) > at akka.dispatch.Mailbox.run(Mailbox.scala:220) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5464) MetricQueryService throws NullPointerException on JobManager
[ https://issues.apache.org/jira/browse/FLINK-5464?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15823880#comment-15823880 ] ASF GitHub Bot commented on FLINK-5464: --- GitHub user zentol opened a pull request: https://github.com/apache/flink/pull/3128 [FLINK-5464] Improve MetricDumpSerialization error handling Rework of #3103. The key change introduced in the previous PR remains; if a gauge returns null it is not serialized. However, I've extended the PR to harden the entire serialization process against exceptions. The major gain here is that a single failed serialization does no longer destroys the entire dump; instead it is simply omitted. In order to allow that I had to replace the ```OutputStream```s with a ```ByteBuffer```. The former doesn't really allow you to handle failures in between serialization steps, as you can't reset the stream in any way. The ```ByteBuffer``` is manually resized if a ```BufferOverflowException``` occurs. * ```MetricDump(De)Serializer#(de)serialize``` will no longer throw any exception but catch and log them instead * Exceptions during the serialization of a metric will cause that metric to be skipped. * added test for handling of gauge returning null * added test for manual resizing of backing array You can merge this pull request into a Git repository by running: $ git pull https://github.com/zentol/flink 5464_mqs_npe Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3128.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3128 commit 8610a47c407afe2140cd4b5651ebc794ef3feec8 Author: zentolDate: 2017-01-12T11:41:56Z [FLINK-5464] [metrics] Ignore metrics that are null commit 442c0a4dee002b73e5b86d6c7bb274484a8900ac Author: zentol Date: 2017-01-16T10:25:58Z [hotfix] Remove unused variable in MetricDumpSerializerTest commit 0f813ebf53414b1b68c6dfe8e3e1dbc896054c36 Author: zentol Date: 2017-01-12T11:42:26Z [FLINK-5464] [metrics] Improve MetricDumpSerialization exception handling > MetricQueryService throws NullPointerException on JobManager > > > Key: FLINK-5464 > URL: https://issues.apache.org/jira/browse/FLINK-5464 > Project: Flink > Issue Type: Bug > Components: Webfrontend >Affects Versions: 1.2.0 >Reporter: Robert Metzger >Assignee: Chesnay Schepler > > I'm using Flink 699f4b0. > My JobManager log contains many of these log entries: > {code} > 2017-01-11 19:42:05,778 WARN > org.apache.flink.runtime.webmonitor.metrics.MetricFetcher - Fetching > metrics failed. > akka.pattern.AskTimeoutException: Ask timed out on > [Actor[akka://flink/user/MetricQueryService#-970662317]] after [1 ms] > at > akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:334) > at akka.actor.Scheduler$$anon$7.run(Scheduler.scala:117) > at > scala.concurrent.Future$InternalCallbackExecutor$.scala$concurrent$Future$InternalCallbackExecutor$$unbatchedExecute(Future.scala:694) > at > scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:691) > at > akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(Scheduler.scala:474) > at > akka.actor.LightArrayRevolverScheduler$$anon$8.executeBucket$1(Scheduler.scala:425) > at > akka.actor.LightArrayRevolverScheduler$$anon$8.nextTick(Scheduler.scala:429) > at > akka.actor.LightArrayRevolverScheduler$$anon$8.run(Scheduler.scala:381) > at java.lang.Thread.run(Thread.java:745) > 2017-01-11 19:42:07,765 WARN > org.apache.flink.runtime.metrics.dump.MetricQueryService - An exception > occurred while processing a message. > java.lang.NullPointerException > at > org.apache.flink.runtime.metrics.dump.MetricDumpSerialization.serializeGauge(MetricDumpSerialization.java:162) > at > org.apache.flink.runtime.metrics.dump.MetricDumpSerialization.access$300(MetricDumpSerialization.java:47) > at > org.apache.flink.runtime.metrics.dump.MetricDumpSerialization$MetricDumpSerializer.serialize(MetricDumpSerialization.java:90) > at > org.apache.flink.runtime.metrics.dump.MetricQueryService.onReceive(MetricQueryService.java:109) > at > akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:167) > at akka.actor.Actor$class.aroundReceive(Actor.scala:467) > at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:97) > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) > at akka.actor.ActorCell.invoke(ActorCell.scala:487) >
[jira] [Commented] (FLINK-5464) MetricQueryService throws NullPointerException on JobManager
[ https://issues.apache.org/jira/browse/FLINK-5464?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15822472#comment-15822472 ] ASF GitHub Bot commented on FLINK-5464: --- Github user zentol commented on the issue: https://github.com/apache/flink/pull/3103 yeah I'll have to rethink this :/ I wanted to just ignore gauges returning null (since nothing in the web-interface accounts for that case), but did not adjust the count of metrics that are submitted. urgh. > MetricQueryService throws NullPointerException on JobManager > > > Key: FLINK-5464 > URL: https://issues.apache.org/jira/browse/FLINK-5464 > Project: Flink > Issue Type: Bug > Components: Webfrontend >Affects Versions: 1.2.0 >Reporter: Robert Metzger >Assignee: Chesnay Schepler > > I'm using Flink 699f4b0. > My JobManager log contains many of these log entries: > {code} > 2017-01-11 19:42:05,778 WARN > org.apache.flink.runtime.webmonitor.metrics.MetricFetcher - Fetching > metrics failed. > akka.pattern.AskTimeoutException: Ask timed out on > [Actor[akka://flink/user/MetricQueryService#-970662317]] after [1 ms] > at > akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:334) > at akka.actor.Scheduler$$anon$7.run(Scheduler.scala:117) > at > scala.concurrent.Future$InternalCallbackExecutor$.scala$concurrent$Future$InternalCallbackExecutor$$unbatchedExecute(Future.scala:694) > at > scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:691) > at > akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(Scheduler.scala:474) > at > akka.actor.LightArrayRevolverScheduler$$anon$8.executeBucket$1(Scheduler.scala:425) > at > akka.actor.LightArrayRevolverScheduler$$anon$8.nextTick(Scheduler.scala:429) > at > akka.actor.LightArrayRevolverScheduler$$anon$8.run(Scheduler.scala:381) > at java.lang.Thread.run(Thread.java:745) > 2017-01-11 19:42:07,765 WARN > org.apache.flink.runtime.metrics.dump.MetricQueryService - An exception > occurred while processing a message. > java.lang.NullPointerException > at > org.apache.flink.runtime.metrics.dump.MetricDumpSerialization.serializeGauge(MetricDumpSerialization.java:162) > at > org.apache.flink.runtime.metrics.dump.MetricDumpSerialization.access$300(MetricDumpSerialization.java:47) > at > org.apache.flink.runtime.metrics.dump.MetricDumpSerialization$MetricDumpSerializer.serialize(MetricDumpSerialization.java:90) > at > org.apache.flink.runtime.metrics.dump.MetricQueryService.onReceive(MetricQueryService.java:109) > at > akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:167) > at akka.actor.Actor$class.aroundReceive(Actor.scala:467) > at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:97) > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) > at akka.actor.ActorCell.invoke(ActorCell.scala:487) > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238) > at akka.dispatch.Mailbox.run(Mailbox.scala:220) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5464) MetricQueryService throws NullPointerException on JobManager
[ https://issues.apache.org/jira/browse/FLINK-5464?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15822473#comment-15822473 ] ASF GitHub Bot commented on FLINK-5464: --- Github user zentol closed the pull request at: https://github.com/apache/flink/pull/3103 > MetricQueryService throws NullPointerException on JobManager > > > Key: FLINK-5464 > URL: https://issues.apache.org/jira/browse/FLINK-5464 > Project: Flink > Issue Type: Bug > Components: Webfrontend >Affects Versions: 1.2.0 >Reporter: Robert Metzger >Assignee: Chesnay Schepler > > I'm using Flink 699f4b0. > My JobManager log contains many of these log entries: > {code} > 2017-01-11 19:42:05,778 WARN > org.apache.flink.runtime.webmonitor.metrics.MetricFetcher - Fetching > metrics failed. > akka.pattern.AskTimeoutException: Ask timed out on > [Actor[akka://flink/user/MetricQueryService#-970662317]] after [1 ms] > at > akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:334) > at akka.actor.Scheduler$$anon$7.run(Scheduler.scala:117) > at > scala.concurrent.Future$InternalCallbackExecutor$.scala$concurrent$Future$InternalCallbackExecutor$$unbatchedExecute(Future.scala:694) > at > scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:691) > at > akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(Scheduler.scala:474) > at > akka.actor.LightArrayRevolverScheduler$$anon$8.executeBucket$1(Scheduler.scala:425) > at > akka.actor.LightArrayRevolverScheduler$$anon$8.nextTick(Scheduler.scala:429) > at > akka.actor.LightArrayRevolverScheduler$$anon$8.run(Scheduler.scala:381) > at java.lang.Thread.run(Thread.java:745) > 2017-01-11 19:42:07,765 WARN > org.apache.flink.runtime.metrics.dump.MetricQueryService - An exception > occurred while processing a message. > java.lang.NullPointerException > at > org.apache.flink.runtime.metrics.dump.MetricDumpSerialization.serializeGauge(MetricDumpSerialization.java:162) > at > org.apache.flink.runtime.metrics.dump.MetricDumpSerialization.access$300(MetricDumpSerialization.java:47) > at > org.apache.flink.runtime.metrics.dump.MetricDumpSerialization$MetricDumpSerializer.serialize(MetricDumpSerialization.java:90) > at > org.apache.flink.runtime.metrics.dump.MetricQueryService.onReceive(MetricQueryService.java:109) > at > akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:167) > at akka.actor.Actor$class.aroundReceive(Actor.scala:467) > at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:97) > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) > at akka.actor.ActorCell.invoke(ActorCell.scala:487) > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238) > at akka.dispatch.Mailbox.run(Mailbox.scala:220) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5464) MetricQueryService throws NullPointerException on JobManager
[ https://issues.apache.org/jira/browse/FLINK-5464?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15821920#comment-15821920 ] ASF GitHub Bot commented on FLINK-5464: --- Github user rmetzger commented on the issue: https://github.com/apache/flink/pull/3103 While testing this PR, I found that the jobmanager.log is now full with exceptions like this one: ``` java.io.EOFException at java.io.DataInputStream.readInt(DataInputStream.java:392) at org.apache.flink.runtime.metrics.dump.MetricDumpSerialization.deserializeString(MetricDumpSerialization.java:230) at org.apache.flink.runtime.metrics.dump.MetricDumpSerialization.deserializeMetricInfo(MetricDumpSerialization.java:278) at org.apache.flink.runtime.metrics.dump.MetricDumpSerialization.deserializeGauge(MetricDumpSerialization.java:243) at org.apache.flink.runtime.metrics.dump.MetricDumpSerialization.access$800(MetricDumpSerialization.java:47) at org.apache.flink.runtime.metrics.dump.MetricDumpSerialization$MetricDumpDeserializer.deserialize(MetricDumpSerialization.java:214) at org.apache.flink.runtime.webmonitor.metrics.MetricFetcher.addMetrics(MetricFetcher.java:196) at org.apache.flink.runtime.webmonitor.metrics.MetricFetcher.access$500(MetricFetcher.java:58) at org.apache.flink.runtime.webmonitor.metrics.MetricFetcher$4.onSuccess(MetricFetcher.java:188) at akka.dispatch.OnSuccess.internal(Future.scala:212) at akka.dispatch.japi$CallbackBridge.apply(Future.scala:175) at akka.dispatch.japi$CallbackBridge.apply(Future.scala:172) at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118) at scala.runtime.AbstractPartialFunction.applyOrElse(AbstractPartialFunction.scala:25) at scala.concurrent.Future$$anonfun$onSuccess$1.apply(Future.scala:117) at scala.concurrent.Future$$anonfun$onSuccess$1.apply(Future.scala:115) at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32) at java.util.concurrent.ForkJoinTask$AdaptedRunnable.exec(ForkJoinTask.java:1265) at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:334) at java.util.concurrent.ForkJoinWorkerThread.execTask(ForkJoinWorkerThread.java:604) at java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:784) at java.util.concurrent.ForkJoinPool.work(ForkJoinPool.java:646) at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:398) ``` > MetricQueryService throws NullPointerException on JobManager > > > Key: FLINK-5464 > URL: https://issues.apache.org/jira/browse/FLINK-5464 > Project: Flink > Issue Type: Bug > Components: Webfrontend >Affects Versions: 1.2.0 >Reporter: Robert Metzger >Assignee: Chesnay Schepler > > I'm using Flink 699f4b0. > My JobManager log contains many of these log entries: > {code} > 2017-01-11 19:42:05,778 WARN > org.apache.flink.runtime.webmonitor.metrics.MetricFetcher - Fetching > metrics failed. > akka.pattern.AskTimeoutException: Ask timed out on > [Actor[akka://flink/user/MetricQueryService#-970662317]] after [1 ms] > at > akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:334) > at akka.actor.Scheduler$$anon$7.run(Scheduler.scala:117) > at > scala.concurrent.Future$InternalCallbackExecutor$.scala$concurrent$Future$InternalCallbackExecutor$$unbatchedExecute(Future.scala:694) > at > scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:691) > at > akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(Scheduler.scala:474) > at > akka.actor.LightArrayRevolverScheduler$$anon$8.executeBucket$1(Scheduler.scala:425) > at > akka.actor.LightArrayRevolverScheduler$$anon$8.nextTick(Scheduler.scala:429) > at > akka.actor.LightArrayRevolverScheduler$$anon$8.run(Scheduler.scala:381) > at java.lang.Thread.run(Thread.java:745) > 2017-01-11 19:42:07,765 WARN > org.apache.flink.runtime.metrics.dump.MetricQueryService - An exception > occurred while processing a message. > java.lang.NullPointerException > at > org.apache.flink.runtime.metrics.dump.MetricDumpSerialization.serializeGauge(MetricDumpSerialization.java:162) > at > org.apache.flink.runtime.metrics.dump.MetricDumpSerialization.access$300(MetricDumpSerialization.java:47) > at > org.apache.flink.runtime.metrics.dump.MetricDumpSerialization$MetricDumpSerializer.serialize(MetricDumpSerialization.java:90) > at > org.apache.flink.runtime.metrics.dump.MetricQueryService.onReceive(MetricQueryService.java:109) > at >
[jira] [Commented] (FLINK-5464) MetricQueryService throws NullPointerException on JobManager
[ https://issues.apache.org/jira/browse/FLINK-5464?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15821684#comment-15821684 ] ASF GitHub Bot commented on FLINK-5464: --- Github user zentol commented on the issue: https://github.com/apache/flink/pull/3103 cc @rmetzger > MetricQueryService throws NullPointerException on JobManager > > > Key: FLINK-5464 > URL: https://issues.apache.org/jira/browse/FLINK-5464 > Project: Flink > Issue Type: Bug > Components: Webfrontend >Affects Versions: 1.2.0 >Reporter: Robert Metzger >Assignee: Chesnay Schepler > > I'm using Flink 699f4b0. > My JobManager log contains many of these log entries: > {code} > 2017-01-11 19:42:05,778 WARN > org.apache.flink.runtime.webmonitor.metrics.MetricFetcher - Fetching > metrics failed. > akka.pattern.AskTimeoutException: Ask timed out on > [Actor[akka://flink/user/MetricQueryService#-970662317]] after [1 ms] > at > akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:334) > at akka.actor.Scheduler$$anon$7.run(Scheduler.scala:117) > at > scala.concurrent.Future$InternalCallbackExecutor$.scala$concurrent$Future$InternalCallbackExecutor$$unbatchedExecute(Future.scala:694) > at > scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:691) > at > akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(Scheduler.scala:474) > at > akka.actor.LightArrayRevolverScheduler$$anon$8.executeBucket$1(Scheduler.scala:425) > at > akka.actor.LightArrayRevolverScheduler$$anon$8.nextTick(Scheduler.scala:429) > at > akka.actor.LightArrayRevolverScheduler$$anon$8.run(Scheduler.scala:381) > at java.lang.Thread.run(Thread.java:745) > 2017-01-11 19:42:07,765 WARN > org.apache.flink.runtime.metrics.dump.MetricQueryService - An exception > occurred while processing a message. > java.lang.NullPointerException > at > org.apache.flink.runtime.metrics.dump.MetricDumpSerialization.serializeGauge(MetricDumpSerialization.java:162) > at > org.apache.flink.runtime.metrics.dump.MetricDumpSerialization.access$300(MetricDumpSerialization.java:47) > at > org.apache.flink.runtime.metrics.dump.MetricDumpSerialization$MetricDumpSerializer.serialize(MetricDumpSerialization.java:90) > at > org.apache.flink.runtime.metrics.dump.MetricQueryService.onReceive(MetricQueryService.java:109) > at > akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:167) > at akka.actor.Actor$class.aroundReceive(Actor.scala:467) > at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:97) > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) > at akka.actor.ActorCell.invoke(ActorCell.scala:487) > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238) > at akka.dispatch.Mailbox.run(Mailbox.scala:220) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5464) MetricQueryService throws NullPointerException on JobManager
[ https://issues.apache.org/jira/browse/FLINK-5464?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15820823#comment-15820823 ] ASF GitHub Bot commented on FLINK-5464: --- GitHub user zentol opened a pull request: https://github.com/apache/flink/pull/3103 [FLINK-5464] [metrics] Prevent some NPEs This PR prevents some NullPointerExceptions from occurring in the metric system. - When registering a metric that is null the metric is ignored, and a warning is logged. - i.e ```group.counter("counter", null);``` - The MetricDumpSerialization completely ignores gauges if their value is null. You can merge this pull request into a Git repository by running: $ git pull https://github.com/zentol/flink 5464_mqs_npe Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3103.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3103 commit 0912848b3ce54842fc6810aa0b041db5547ac690 Author: zentolDate: 2017-01-12T11:41:56Z [FLINK-5464] [metrics] Ignore metrics that are null commit 941c83a599221fc57c02605e2c3bc348d70aa8b2 Author: zentol Date: 2017-01-12T11:42:26Z [FLINK-5464] [metrics] Prevent Gauge NPE in serialization > MetricQueryService throws NullPointerException on JobManager > > > Key: FLINK-5464 > URL: https://issues.apache.org/jira/browse/FLINK-5464 > Project: Flink > Issue Type: Bug > Components: Webfrontend >Affects Versions: 1.2.0 >Reporter: Robert Metzger >Assignee: Chesnay Schepler > > I'm using Flink 699f4b0. > My JobManager log contains many of these log entries: > {code} > 2017-01-11 19:42:05,778 WARN > org.apache.flink.runtime.webmonitor.metrics.MetricFetcher - Fetching > metrics failed. > akka.pattern.AskTimeoutException: Ask timed out on > [Actor[akka://flink/user/MetricQueryService#-970662317]] after [1 ms] > at > akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:334) > at akka.actor.Scheduler$$anon$7.run(Scheduler.scala:117) > at > scala.concurrent.Future$InternalCallbackExecutor$.scala$concurrent$Future$InternalCallbackExecutor$$unbatchedExecute(Future.scala:694) > at > scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:691) > at > akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(Scheduler.scala:474) > at > akka.actor.LightArrayRevolverScheduler$$anon$8.executeBucket$1(Scheduler.scala:425) > at > akka.actor.LightArrayRevolverScheduler$$anon$8.nextTick(Scheduler.scala:429) > at > akka.actor.LightArrayRevolverScheduler$$anon$8.run(Scheduler.scala:381) > at java.lang.Thread.run(Thread.java:745) > 2017-01-11 19:42:07,765 WARN > org.apache.flink.runtime.metrics.dump.MetricQueryService - An exception > occurred while processing a message. > java.lang.NullPointerException > at > org.apache.flink.runtime.metrics.dump.MetricDumpSerialization.serializeGauge(MetricDumpSerialization.java:162) > at > org.apache.flink.runtime.metrics.dump.MetricDumpSerialization.access$300(MetricDumpSerialization.java:47) > at > org.apache.flink.runtime.metrics.dump.MetricDumpSerialization$MetricDumpSerializer.serialize(MetricDumpSerialization.java:90) > at > org.apache.flink.runtime.metrics.dump.MetricQueryService.onReceive(MetricQueryService.java:109) > at > akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:167) > at akka.actor.Actor$class.aroundReceive(Actor.scala:467) > at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:97) > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) > at akka.actor.ActorCell.invoke(ActorCell.scala:487) > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238) > at akka.dispatch.Mailbox.run(Mailbox.scala:220) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5464) MetricQueryService throws NullPointerException on JobManager
[ https://issues.apache.org/jira/browse/FLINK-5464?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15820801#comment-15820801 ] Chesnay Schepler commented on FLINK-5464: - There are 2 possible cases for this: Either the supplied gauge was null, or it was not null but the value it supplies is null. Neither of these cases are checked at the moment. > MetricQueryService throws NullPointerException on JobManager > > > Key: FLINK-5464 > URL: https://issues.apache.org/jira/browse/FLINK-5464 > Project: Flink > Issue Type: Bug > Components: Webfrontend >Affects Versions: 1.2.0 >Reporter: Robert Metzger >Assignee: Chesnay Schepler > > I'm using Flink 699f4b0. > My JobManager log contains many of these log entries: > {code} > 2017-01-11 19:42:05,778 WARN > org.apache.flink.runtime.webmonitor.metrics.MetricFetcher - Fetching > metrics failed. > akka.pattern.AskTimeoutException: Ask timed out on > [Actor[akka://flink/user/MetricQueryService#-970662317]] after [1 ms] > at > akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:334) > at akka.actor.Scheduler$$anon$7.run(Scheduler.scala:117) > at > scala.concurrent.Future$InternalCallbackExecutor$.scala$concurrent$Future$InternalCallbackExecutor$$unbatchedExecute(Future.scala:694) > at > scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:691) > at > akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(Scheduler.scala:474) > at > akka.actor.LightArrayRevolverScheduler$$anon$8.executeBucket$1(Scheduler.scala:425) > at > akka.actor.LightArrayRevolverScheduler$$anon$8.nextTick(Scheduler.scala:429) > at > akka.actor.LightArrayRevolverScheduler$$anon$8.run(Scheduler.scala:381) > at java.lang.Thread.run(Thread.java:745) > 2017-01-11 19:42:07,765 WARN > org.apache.flink.runtime.metrics.dump.MetricQueryService - An exception > occurred while processing a message. > java.lang.NullPointerException > at > org.apache.flink.runtime.metrics.dump.MetricDumpSerialization.serializeGauge(MetricDumpSerialization.java:162) > at > org.apache.flink.runtime.metrics.dump.MetricDumpSerialization.access$300(MetricDumpSerialization.java:47) > at > org.apache.flink.runtime.metrics.dump.MetricDumpSerialization$MetricDumpSerializer.serialize(MetricDumpSerialization.java:90) > at > org.apache.flink.runtime.metrics.dump.MetricQueryService.onReceive(MetricQueryService.java:109) > at > akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:167) > at akka.actor.Actor$class.aroundReceive(Actor.scala:467) > at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:97) > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) > at akka.actor.ActorCell.invoke(ActorCell.scala:487) > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238) > at akka.dispatch.Mailbox.run(Mailbox.scala:220) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)