[jira] [Commented] (FLINK-5464) MetricQueryService throws NullPointerException on JobManager

2017-01-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5464?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15834770#comment-15834770
 ] 

ASF GitHub Bot commented on FLINK-5464:
---

Github user uce commented on a diff in the pull request:

https://github.com/apache/flink/pull/3128#discussion_r97343787
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerialization.java
 ---
@@ -70,16 +93,26 @@ public MetricSerializationResult(byte[] data, int 
numCounters, int numGauges, in

//-
// Serialization

//-
+
public static class MetricDumpSerializer {
+
private DataOutputSerializer buffer = new 
DataOutputSerializer(1024 * 32);
 
/**
 * Serializes the given metrics and returns the resulting byte 
array.
+* 
+* Should a {@link Metric} accessed in this method throw an 
exception it will be omitted from the returned
--- End diff --

Very nice 


> MetricQueryService throws NullPointerException on JobManager
> 
>
> Key: FLINK-5464
> URL: https://issues.apache.org/jira/browse/FLINK-5464
> Project: Flink
>  Issue Type: Bug
>  Components: Webfrontend
>Affects Versions: 1.2.0
>Reporter: Robert Metzger
>Assignee: Chesnay Schepler
>
> I'm using Flink 699f4b0.
> My JobManager log contains many of these log entries:
> {code}
> 2017-01-11 19:42:05,778 WARN  
> org.apache.flink.runtime.webmonitor.metrics.MetricFetcher - Fetching 
> metrics failed.
> akka.pattern.AskTimeoutException: Ask timed out on 
> [Actor[akka://flink/user/MetricQueryService#-970662317]] after [1 ms]
>   at 
> akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:334)
>   at akka.actor.Scheduler$$anon$7.run(Scheduler.scala:117)
>   at 
> scala.concurrent.Future$InternalCallbackExecutor$.scala$concurrent$Future$InternalCallbackExecutor$$unbatchedExecute(Future.scala:694)
>   at 
> scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:691)
>   at 
> akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(Scheduler.scala:474)
>   at 
> akka.actor.LightArrayRevolverScheduler$$anon$8.executeBucket$1(Scheduler.scala:425)
>   at 
> akka.actor.LightArrayRevolverScheduler$$anon$8.nextTick(Scheduler.scala:429)
>   at 
> akka.actor.LightArrayRevolverScheduler$$anon$8.run(Scheduler.scala:381)
>   at java.lang.Thread.run(Thread.java:745)
> 2017-01-11 19:42:07,765 WARN  
> org.apache.flink.runtime.metrics.dump.MetricQueryService  - An exception 
> occurred while processing a message.
> java.lang.NullPointerException
>   at 
> org.apache.flink.runtime.metrics.dump.MetricDumpSerialization.serializeGauge(MetricDumpSerialization.java:162)
>   at 
> org.apache.flink.runtime.metrics.dump.MetricDumpSerialization.access$300(MetricDumpSerialization.java:47)
>   at 
> org.apache.flink.runtime.metrics.dump.MetricDumpSerialization$MetricDumpSerializer.serialize(MetricDumpSerialization.java:90)
>   at 
> org.apache.flink.runtime.metrics.dump.MetricQueryService.onReceive(MetricQueryService.java:109)
>   at 
> akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:167)
>   at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
>   at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:97)
>   at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
>   at akka.actor.ActorCell.invoke(ActorCell.scala:487)
>   at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
>   at akka.dispatch.Mailbox.run(Mailbox.scala:220)
>   at 
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
>   at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>   at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5464) MetricQueryService throws NullPointerException on JobManager

2017-01-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5464?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15834724#comment-15834724
 ] 

ASF GitHub Bot commented on FLINK-5464:
---

Github user zentol commented on the issue:

https://github.com/apache/flink/pull/3128
  
@uce I've addressed your comments.


> MetricQueryService throws NullPointerException on JobManager
> 
>
> Key: FLINK-5464
> URL: https://issues.apache.org/jira/browse/FLINK-5464
> Project: Flink
>  Issue Type: Bug
>  Components: Webfrontend
>Affects Versions: 1.2.0
>Reporter: Robert Metzger
>Assignee: Chesnay Schepler
>
> I'm using Flink 699f4b0.
> My JobManager log contains many of these log entries:
> {code}
> 2017-01-11 19:42:05,778 WARN  
> org.apache.flink.runtime.webmonitor.metrics.MetricFetcher - Fetching 
> metrics failed.
> akka.pattern.AskTimeoutException: Ask timed out on 
> [Actor[akka://flink/user/MetricQueryService#-970662317]] after [1 ms]
>   at 
> akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:334)
>   at akka.actor.Scheduler$$anon$7.run(Scheduler.scala:117)
>   at 
> scala.concurrent.Future$InternalCallbackExecutor$.scala$concurrent$Future$InternalCallbackExecutor$$unbatchedExecute(Future.scala:694)
>   at 
> scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:691)
>   at 
> akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(Scheduler.scala:474)
>   at 
> akka.actor.LightArrayRevolverScheduler$$anon$8.executeBucket$1(Scheduler.scala:425)
>   at 
> akka.actor.LightArrayRevolverScheduler$$anon$8.nextTick(Scheduler.scala:429)
>   at 
> akka.actor.LightArrayRevolverScheduler$$anon$8.run(Scheduler.scala:381)
>   at java.lang.Thread.run(Thread.java:745)
> 2017-01-11 19:42:07,765 WARN  
> org.apache.flink.runtime.metrics.dump.MetricQueryService  - An exception 
> occurred while processing a message.
> java.lang.NullPointerException
>   at 
> org.apache.flink.runtime.metrics.dump.MetricDumpSerialization.serializeGauge(MetricDumpSerialization.java:162)
>   at 
> org.apache.flink.runtime.metrics.dump.MetricDumpSerialization.access$300(MetricDumpSerialization.java:47)
>   at 
> org.apache.flink.runtime.metrics.dump.MetricDumpSerialization$MetricDumpSerializer.serialize(MetricDumpSerialization.java:90)
>   at 
> org.apache.flink.runtime.metrics.dump.MetricQueryService.onReceive(MetricQueryService.java:109)
>   at 
> akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:167)
>   at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
>   at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:97)
>   at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
>   at akka.actor.ActorCell.invoke(ActorCell.scala:487)
>   at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
>   at akka.dispatch.Mailbox.run(Mailbox.scala:220)
>   at 
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
>   at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>   at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5464) MetricQueryService throws NullPointerException on JobManager

2017-01-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5464?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15834655#comment-15834655
 ] 

ASF GitHub Bot commented on FLINK-5464:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3128#discussion_r97327675
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerialization.java
 ---
@@ -191,62 +233,65 @@ private static void serializeMeter(DataOutputStream 
dos, Meter meter) throws IOE
 *
 * @param data serialized metrics
 * @return A list containing the deserialized metrics.
-* @throws IOException
 */
-   public List deserialize(byte[] data) throws 
IOException {
-   ByteArrayInputStream bais = new 
ByteArrayInputStream(data);
-   DataInputStream dis = new DataInputStream(bais);
 
-   int numCounters = dis.readInt();
-   int numGauges = dis.readInt();
-   int numHistograms = dis.readInt();
-   int numMeters = dis.readInt();
+   public List 
deserialize(MetricDumpSerialization.MetricSerializationResult data) {
+   DataInputView in = new DataInputDeserializer(data.data, 
0, data.data.length);
 
-   List metrics = new ArrayList<>(numCounters 
+ numGauges + numHistograms);
+   List metrics = new 
ArrayList<>(data.numCounters + data.numGauges + data.numHistograms + 
data.numMeters);
 
-   for (int x = 0; x < numCounters; x++) {
-   metrics.add(deserializeCounter(dis));
+   for (int x = 0; x < data.numCounters; x++) {
+   try {
+   metrics.add(deserializeCounter(in));
+   } catch (Exception e) {
+   LOG.warn("Failed to deserialize 
counter.", e);
+   }
}
 
-   for (int x = 0; x < numGauges; x++) {
-   metrics.add(deserializeGauge(dis));
+   for (int x = 0; x < data.numGauges; x++) {
+   try {
+   metrics.add(deserializeGauge(in));
+   } catch (Exception e) {
+   LOG.warn("Failed to deserialize 
counter.", e);
+   }
}
 
-   for (int x = 0; x < numHistograms; x++) {
-   metrics.add(deserializeHistogram(dis));
+   for (int x = 0; x < data.numHistograms; x++) {
+   try {
+   metrics.add(deserializeHistogram(in));
+   } catch (Exception e) {
+   LOG.warn("Failed to deserialize 
counter.", e);
+   }
}
 
-   for (int x = 0; x < numMeters; x++) {
-   metrics.add(deserializeMeter(dis));
+   for (int x = 0; x < data.numMeters; x++) {
+   try {
+   metrics.add(deserializeMeter(in));
+   } catch (Exception e) {
+   LOG.warn("Failed to deserialize 
counter.", e);
+   }
}
-
-   return metrics;
}
}
 
-   private static String deserializeString(DataInputStream dis) throws 
IOException {
-   int stringLength = dis.readInt();
-   byte[] bytes = new byte[stringLength];
-   dis.readFully(bytes);
-   return new String(bytes);
-   }
 
-   private static MetricDump.CounterDump 
deserializeCounter(DataInputStream dis) throws IOException {
+   private static MetricDump.CounterDump deserializeCounter(DataInputView 
dis) throws IOException {
QueryScopeInfo scope = deserializeMetricInfo(dis);
-   String name = deserializeString(dis);
-   return new MetricDump.CounterDump(scope, name, dis.readLong());
+   String name = dis.readUTF();
+   long count = dis.readLong();
+   return new MetricDump.CounterDump(scope, name, count);
--- End diff --

Personally for short methods i think it's overkill. I would do it for 
methods like `deserializaHistogram` though.


> MetricQueryService throws NullPointerException on JobManager
> 

[jira] [Commented] (FLINK-5464) MetricQueryService throws NullPointerException on JobManager

2017-01-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5464?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15834654#comment-15834654
 ] 

ASF GitHub Bot commented on FLINK-5464:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3128#discussion_r97327512
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerialization.java
 ---
@@ -191,62 +233,65 @@ private static void serializeMeter(DataOutputStream 
dos, Meter meter) throws IOE
 *
 * @param data serialized metrics
 * @return A list containing the deserialized metrics.
-* @throws IOException
 */
-   public List deserialize(byte[] data) throws 
IOException {
-   ByteArrayInputStream bais = new 
ByteArrayInputStream(data);
-   DataInputStream dis = new DataInputStream(bais);
 
-   int numCounters = dis.readInt();
-   int numGauges = dis.readInt();
-   int numHistograms = dis.readInt();
-   int numMeters = dis.readInt();
+   public List 
deserialize(MetricDumpSerialization.MetricSerializationResult data) {
+   DataInputView in = new DataInputDeserializer(data.data, 
0, data.data.length);
 
-   List metrics = new ArrayList<>(numCounters 
+ numGauges + numHistograms);
+   List metrics = new 
ArrayList<>(data.numCounters + data.numGauges + data.numHistograms + 
data.numMeters);
 
-   for (int x = 0; x < numCounters; x++) {
-   metrics.add(deserializeCounter(dis));
+   for (int x = 0; x < data.numCounters; x++) {
+   try {
+   metrics.add(deserializeCounter(in));
+   } catch (Exception e) {
+   LOG.warn("Failed to deserialize 
counter.", e);
+   }
}
 
-   for (int x = 0; x < numGauges; x++) {
-   metrics.add(deserializeGauge(dis));
+   for (int x = 0; x < data.numGauges; x++) {
+   try {
+   metrics.add(deserializeGauge(in));
+   } catch (Exception e) {
+   LOG.warn("Failed to deserialize 
counter.", e);
+   }
}
 
-   for (int x = 0; x < numHistograms; x++) {
-   metrics.add(deserializeHistogram(dis));
+   for (int x = 0; x < data.numHistograms; x++) {
+   try {
+   metrics.add(deserializeHistogram(in));
+   } catch (Exception e) {
+   LOG.warn("Failed to deserialize 
counter.", e);
+   }
}
 
-   for (int x = 0; x < numMeters; x++) {
-   metrics.add(deserializeMeter(dis));
+   for (int x = 0; x < data.numMeters; x++) {
+   try {
+   metrics.add(deserializeMeter(in));
+   } catch (Exception e) {
+   LOG.warn("Failed to deserialize 
counter.", e);
+   }
}
-
-   return metrics;
--- End diff --

It appears i forgot to push some of the last-minute fixes :/


> MetricQueryService throws NullPointerException on JobManager
> 
>
> Key: FLINK-5464
> URL: https://issues.apache.org/jira/browse/FLINK-5464
> Project: Flink
>  Issue Type: Bug
>  Components: Webfrontend
>Affects Versions: 1.2.0
>Reporter: Robert Metzger
>Assignee: Chesnay Schepler
>
> I'm using Flink 699f4b0.
> My JobManager log contains many of these log entries:
> {code}
> 2017-01-11 19:42:05,778 WARN  
> org.apache.flink.runtime.webmonitor.metrics.MetricFetcher - Fetching 
> metrics failed.
> akka.pattern.AskTimeoutException: Ask timed out on 
> [Actor[akka://flink/user/MetricQueryService#-970662317]] after [1 ms]
>   at 
> akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:334)
>   at akka.actor.Scheduler$$anon$7.run(Scheduler.scala:117)
>   at 
> 

[jira] [Commented] (FLINK-5464) MetricQueryService throws NullPointerException on JobManager

2017-01-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5464?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15834510#comment-15834510
 ] 

ASF GitHub Bot commented on FLINK-5464:
---

Github user uce commented on a diff in the pull request:

https://github.com/apache/flink/pull/3128#discussion_r97315164
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerialization.java
 ---
@@ -50,12 +51,27 @@
private MetricDumpSerialization() {
}
 
+   public static class MetricSerializationResult {
+   public final byte[] data;
+   public final int numCounters;
+   public final int numGauges;
+   public final int numMeters;
+   public final int numHistograms;
+   
+   public MetricSerializationResult(byte[] data, int numCounters, 
int numGauges, int numMeters, int numHistograms) {
+   this.data = data;
--- End diff --

Let's add simple sanity checks `checkNotNull(data)` and `checkArgument(num* 
>= 0)` for the other fields.


> MetricQueryService throws NullPointerException on JobManager
> 
>
> Key: FLINK-5464
> URL: https://issues.apache.org/jira/browse/FLINK-5464
> Project: Flink
>  Issue Type: Bug
>  Components: Webfrontend
>Affects Versions: 1.2.0
>Reporter: Robert Metzger
>Assignee: Chesnay Schepler
>
> I'm using Flink 699f4b0.
> My JobManager log contains many of these log entries:
> {code}
> 2017-01-11 19:42:05,778 WARN  
> org.apache.flink.runtime.webmonitor.metrics.MetricFetcher - Fetching 
> metrics failed.
> akka.pattern.AskTimeoutException: Ask timed out on 
> [Actor[akka://flink/user/MetricQueryService#-970662317]] after [1 ms]
>   at 
> akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:334)
>   at akka.actor.Scheduler$$anon$7.run(Scheduler.scala:117)
>   at 
> scala.concurrent.Future$InternalCallbackExecutor$.scala$concurrent$Future$InternalCallbackExecutor$$unbatchedExecute(Future.scala:694)
>   at 
> scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:691)
>   at 
> akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(Scheduler.scala:474)
>   at 
> akka.actor.LightArrayRevolverScheduler$$anon$8.executeBucket$1(Scheduler.scala:425)
>   at 
> akka.actor.LightArrayRevolverScheduler$$anon$8.nextTick(Scheduler.scala:429)
>   at 
> akka.actor.LightArrayRevolverScheduler$$anon$8.run(Scheduler.scala:381)
>   at java.lang.Thread.run(Thread.java:745)
> 2017-01-11 19:42:07,765 WARN  
> org.apache.flink.runtime.metrics.dump.MetricQueryService  - An exception 
> occurred while processing a message.
> java.lang.NullPointerException
>   at 
> org.apache.flink.runtime.metrics.dump.MetricDumpSerialization.serializeGauge(MetricDumpSerialization.java:162)
>   at 
> org.apache.flink.runtime.metrics.dump.MetricDumpSerialization.access$300(MetricDumpSerialization.java:47)
>   at 
> org.apache.flink.runtime.metrics.dump.MetricDumpSerialization$MetricDumpSerializer.serialize(MetricDumpSerialization.java:90)
>   at 
> org.apache.flink.runtime.metrics.dump.MetricQueryService.onReceive(MetricQueryService.java:109)
>   at 
> akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:167)
>   at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
>   at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:97)
>   at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
>   at akka.actor.ActorCell.invoke(ActorCell.scala:487)
>   at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
>   at akka.dispatch.Mailbox.run(Mailbox.scala:220)
>   at 
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
>   at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>   at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5464) MetricQueryService throws NullPointerException on JobManager

2017-01-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5464?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15834507#comment-15834507
 ] 

ASF GitHub Bot commented on FLINK-5464:
---

Github user uce commented on a diff in the pull request:

https://github.com/apache/flink/pull/3128#discussion_r97314773
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerialization.java
 ---
@@ -50,12 +51,27 @@
private MetricDumpSerialization() {
}
 
+   public static class MetricSerializationResult {
+   public final byte[] data;
--- End diff --

Wonderning whether to call this `serializedMetrics`


> MetricQueryService throws NullPointerException on JobManager
> 
>
> Key: FLINK-5464
> URL: https://issues.apache.org/jira/browse/FLINK-5464
> Project: Flink
>  Issue Type: Bug
>  Components: Webfrontend
>Affects Versions: 1.2.0
>Reporter: Robert Metzger
>Assignee: Chesnay Schepler
>
> I'm using Flink 699f4b0.
> My JobManager log contains many of these log entries:
> {code}
> 2017-01-11 19:42:05,778 WARN  
> org.apache.flink.runtime.webmonitor.metrics.MetricFetcher - Fetching 
> metrics failed.
> akka.pattern.AskTimeoutException: Ask timed out on 
> [Actor[akka://flink/user/MetricQueryService#-970662317]] after [1 ms]
>   at 
> akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:334)
>   at akka.actor.Scheduler$$anon$7.run(Scheduler.scala:117)
>   at 
> scala.concurrent.Future$InternalCallbackExecutor$.scala$concurrent$Future$InternalCallbackExecutor$$unbatchedExecute(Future.scala:694)
>   at 
> scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:691)
>   at 
> akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(Scheduler.scala:474)
>   at 
> akka.actor.LightArrayRevolverScheduler$$anon$8.executeBucket$1(Scheduler.scala:425)
>   at 
> akka.actor.LightArrayRevolverScheduler$$anon$8.nextTick(Scheduler.scala:429)
>   at 
> akka.actor.LightArrayRevolverScheduler$$anon$8.run(Scheduler.scala:381)
>   at java.lang.Thread.run(Thread.java:745)
> 2017-01-11 19:42:07,765 WARN  
> org.apache.flink.runtime.metrics.dump.MetricQueryService  - An exception 
> occurred while processing a message.
> java.lang.NullPointerException
>   at 
> org.apache.flink.runtime.metrics.dump.MetricDumpSerialization.serializeGauge(MetricDumpSerialization.java:162)
>   at 
> org.apache.flink.runtime.metrics.dump.MetricDumpSerialization.access$300(MetricDumpSerialization.java:47)
>   at 
> org.apache.flink.runtime.metrics.dump.MetricDumpSerialization$MetricDumpSerializer.serialize(MetricDumpSerialization.java:90)
>   at 
> org.apache.flink.runtime.metrics.dump.MetricQueryService.onReceive(MetricQueryService.java:109)
>   at 
> akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:167)
>   at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
>   at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:97)
>   at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
>   at akka.actor.ActorCell.invoke(ActorCell.scala:487)
>   at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
>   at akka.dispatch.Mailbox.run(Mailbox.scala:220)
>   at 
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
>   at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>   at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5464) MetricQueryService throws NullPointerException on JobManager

2017-01-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5464?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15834508#comment-15834508
 ] 

ASF GitHub Bot commented on FLINK-5464:
---

Github user uce commented on a diff in the pull request:

https://github.com/apache/flink/pull/3128#discussion_r97316854
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerialization.java
 ---
@@ -191,62 +233,65 @@ private static void serializeMeter(DataOutputStream 
dos, Meter meter) throws IOE
 *
 * @param data serialized metrics
 * @return A list containing the deserialized metrics.
-* @throws IOException
 */
-   public List deserialize(byte[] data) throws 
IOException {
-   ByteArrayInputStream bais = new 
ByteArrayInputStream(data);
-   DataInputStream dis = new DataInputStream(bais);
 
-   int numCounters = dis.readInt();
-   int numGauges = dis.readInt();
-   int numHistograms = dis.readInt();
-   int numMeters = dis.readInt();
+   public List 
deserialize(MetricDumpSerialization.MetricSerializationResult data) {
--- End diff --

In line 230, an empty line is missing before `class MetricDumpDeserializer`.


> MetricQueryService throws NullPointerException on JobManager
> 
>
> Key: FLINK-5464
> URL: https://issues.apache.org/jira/browse/FLINK-5464
> Project: Flink
>  Issue Type: Bug
>  Components: Webfrontend
>Affects Versions: 1.2.0
>Reporter: Robert Metzger
>Assignee: Chesnay Schepler
>
> I'm using Flink 699f4b0.
> My JobManager log contains many of these log entries:
> {code}
> 2017-01-11 19:42:05,778 WARN  
> org.apache.flink.runtime.webmonitor.metrics.MetricFetcher - Fetching 
> metrics failed.
> akka.pattern.AskTimeoutException: Ask timed out on 
> [Actor[akka://flink/user/MetricQueryService#-970662317]] after [1 ms]
>   at 
> akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:334)
>   at akka.actor.Scheduler$$anon$7.run(Scheduler.scala:117)
>   at 
> scala.concurrent.Future$InternalCallbackExecutor$.scala$concurrent$Future$InternalCallbackExecutor$$unbatchedExecute(Future.scala:694)
>   at 
> scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:691)
>   at 
> akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(Scheduler.scala:474)
>   at 
> akka.actor.LightArrayRevolverScheduler$$anon$8.executeBucket$1(Scheduler.scala:425)
>   at 
> akka.actor.LightArrayRevolverScheduler$$anon$8.nextTick(Scheduler.scala:429)
>   at 
> akka.actor.LightArrayRevolverScheduler$$anon$8.run(Scheduler.scala:381)
>   at java.lang.Thread.run(Thread.java:745)
> 2017-01-11 19:42:07,765 WARN  
> org.apache.flink.runtime.metrics.dump.MetricQueryService  - An exception 
> occurred while processing a message.
> java.lang.NullPointerException
>   at 
> org.apache.flink.runtime.metrics.dump.MetricDumpSerialization.serializeGauge(MetricDumpSerialization.java:162)
>   at 
> org.apache.flink.runtime.metrics.dump.MetricDumpSerialization.access$300(MetricDumpSerialization.java:47)
>   at 
> org.apache.flink.runtime.metrics.dump.MetricDumpSerialization$MetricDumpSerializer.serialize(MetricDumpSerialization.java:90)
>   at 
> org.apache.flink.runtime.metrics.dump.MetricQueryService.onReceive(MetricQueryService.java:109)
>   at 
> akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:167)
>   at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
>   at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:97)
>   at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
>   at akka.actor.ActorCell.invoke(ActorCell.scala:487)
>   at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
>   at akka.dispatch.Mailbox.run(Mailbox.scala:220)
>   at 
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
>   at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>   at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5464) MetricQueryService throws NullPointerException on JobManager

2017-01-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5464?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15834511#comment-15834511
 ] 

ASF GitHub Bot commented on FLINK-5464:
---

Github user uce commented on a diff in the pull request:

https://github.com/apache/flink/pull/3128#discussion_r97317434
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerialization.java
 ---
@@ -191,62 +233,65 @@ private static void serializeMeter(DataOutputStream 
dos, Meter meter) throws IOE
 *
 * @param data serialized metrics
 * @return A list containing the deserialized metrics.
-* @throws IOException
 */
-   public List deserialize(byte[] data) throws 
IOException {
-   ByteArrayInputStream bais = new 
ByteArrayInputStream(data);
-   DataInputStream dis = new DataInputStream(bais);
 
-   int numCounters = dis.readInt();
-   int numGauges = dis.readInt();
-   int numHistograms = dis.readInt();
-   int numMeters = dis.readInt();
+   public List 
deserialize(MetricDumpSerialization.MetricSerializationResult data) {
+   DataInputView in = new DataInputDeserializer(data.data, 
0, data.data.length);
 
-   List metrics = new ArrayList<>(numCounters 
+ numGauges + numHistograms);
+   List metrics = new 
ArrayList<>(data.numCounters + data.numGauges + data.numHistograms + 
data.numMeters);
 
-   for (int x = 0; x < numCounters; x++) {
-   metrics.add(deserializeCounter(dis));
+   for (int x = 0; x < data.numCounters; x++) {
+   try {
+   metrics.add(deserializeCounter(in));
+   } catch (Exception e) {
+   LOG.warn("Failed to deserialize 
counter.", e);
+   }
}
 
-   for (int x = 0; x < numGauges; x++) {
-   metrics.add(deserializeGauge(dis));
+   for (int x = 0; x < data.numGauges; x++) {
+   try {
+   metrics.add(deserializeGauge(in));
+   } catch (Exception e) {
+   LOG.warn("Failed to deserialize 
counter.", e);
+   }
}
 
-   for (int x = 0; x < numHistograms; x++) {
-   metrics.add(deserializeHistogram(dis));
+   for (int x = 0; x < data.numHistograms; x++) {
+   try {
+   metrics.add(deserializeHistogram(in));
+   } catch (Exception e) {
+   LOG.warn("Failed to deserialize 
counter.", e);
+   }
}
 
-   for (int x = 0; x < numMeters; x++) {
-   metrics.add(deserializeMeter(dis));
+   for (int x = 0; x < data.numMeters; x++) {
+   try {
+   metrics.add(deserializeMeter(in));
+   } catch (Exception e) {
+   LOG.warn("Failed to deserialize 
counter.", e);
+   }
}
-
-   return metrics;
--- End diff --

Code does not compile because of the missing return.


> MetricQueryService throws NullPointerException on JobManager
> 
>
> Key: FLINK-5464
> URL: https://issues.apache.org/jira/browse/FLINK-5464
> Project: Flink
>  Issue Type: Bug
>  Components: Webfrontend
>Affects Versions: 1.2.0
>Reporter: Robert Metzger
>Assignee: Chesnay Schepler
>
> I'm using Flink 699f4b0.
> My JobManager log contains many of these log entries:
> {code}
> 2017-01-11 19:42:05,778 WARN  
> org.apache.flink.runtime.webmonitor.metrics.MetricFetcher - Fetching 
> metrics failed.
> akka.pattern.AskTimeoutException: Ask timed out on 
> [Actor[akka://flink/user/MetricQueryService#-970662317]] after [1 ms]
>   at 
> akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:334)
>   at akka.actor.Scheduler$$anon$7.run(Scheduler.scala:117)
>   at 
> 

[jira] [Commented] (FLINK-5464) MetricQueryService throws NullPointerException on JobManager

2017-01-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5464?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15834514#comment-15834514
 ] 

ASF GitHub Bot commented on FLINK-5464:
---

Github user uce commented on a diff in the pull request:

https://github.com/apache/flink/pull/3128#discussion_r97314569
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerialization.java
 ---
@@ -17,19 +17,20 @@
  */
 package org.apache.flink.runtime.metrics.dump;
 
-import org.apache.commons.io.output.ByteArrayOutputStream;
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.metrics.Counter;
 import org.apache.flink.metrics.Gauge;
 import org.apache.flink.metrics.Histogram;
 import org.apache.flink.metrics.HistogramStatistics;
 import org.apache.flink.metrics.Meter;
+import org.apache.flink.runtime.util.DataInputDeserializer;
+import org.apache.flink.runtime.util.DataOutputSerializer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.ByteArrayInputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
+import java.io.DataInput;
+import java.io.DataOutput;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
--- End diff --

In line 49 (above the `LOG` field) an empty line is missing


> MetricQueryService throws NullPointerException on JobManager
> 
>
> Key: FLINK-5464
> URL: https://issues.apache.org/jira/browse/FLINK-5464
> Project: Flink
>  Issue Type: Bug
>  Components: Webfrontend
>Affects Versions: 1.2.0
>Reporter: Robert Metzger
>Assignee: Chesnay Schepler
>
> I'm using Flink 699f4b0.
> My JobManager log contains many of these log entries:
> {code}
> 2017-01-11 19:42:05,778 WARN  
> org.apache.flink.runtime.webmonitor.metrics.MetricFetcher - Fetching 
> metrics failed.
> akka.pattern.AskTimeoutException: Ask timed out on 
> [Actor[akka://flink/user/MetricQueryService#-970662317]] after [1 ms]
>   at 
> akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:334)
>   at akka.actor.Scheduler$$anon$7.run(Scheduler.scala:117)
>   at 
> scala.concurrent.Future$InternalCallbackExecutor$.scala$concurrent$Future$InternalCallbackExecutor$$unbatchedExecute(Future.scala:694)
>   at 
> scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:691)
>   at 
> akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(Scheduler.scala:474)
>   at 
> akka.actor.LightArrayRevolverScheduler$$anon$8.executeBucket$1(Scheduler.scala:425)
>   at 
> akka.actor.LightArrayRevolverScheduler$$anon$8.nextTick(Scheduler.scala:429)
>   at 
> akka.actor.LightArrayRevolverScheduler$$anon$8.run(Scheduler.scala:381)
>   at java.lang.Thread.run(Thread.java:745)
> 2017-01-11 19:42:07,765 WARN  
> org.apache.flink.runtime.metrics.dump.MetricQueryService  - An exception 
> occurred while processing a message.
> java.lang.NullPointerException
>   at 
> org.apache.flink.runtime.metrics.dump.MetricDumpSerialization.serializeGauge(MetricDumpSerialization.java:162)
>   at 
> org.apache.flink.runtime.metrics.dump.MetricDumpSerialization.access$300(MetricDumpSerialization.java:47)
>   at 
> org.apache.flink.runtime.metrics.dump.MetricDumpSerialization$MetricDumpSerializer.serialize(MetricDumpSerialization.java:90)
>   at 
> org.apache.flink.runtime.metrics.dump.MetricQueryService.onReceive(MetricQueryService.java:109)
>   at 
> akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:167)
>   at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
>   at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:97)
>   at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
>   at akka.actor.ActorCell.invoke(ActorCell.scala:487)
>   at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
>   at akka.dispatch.Mailbox.run(Mailbox.scala:220)
>   at 
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
>   at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>   at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5464) MetricQueryService throws NullPointerException on JobManager

2017-01-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5464?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15834517#comment-15834517
 ] 

ASF GitHub Bot commented on FLINK-5464:
---

Github user uce commented on a diff in the pull request:

https://github.com/apache/flink/pull/3128#discussion_r97315205
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerialization.java
 ---
@@ -50,12 +51,27 @@
private MetricDumpSerialization() {
}
 
+   public static class MetricSerializationResult {
+   public final byte[] data;
+   public final int numCounters;
+   public final int numGauges;
+   public final int numMeters;
+   public final int numHistograms;
+   
+   public MetricSerializationResult(byte[] data, int numCounters, 
int numGauges, int numMeters, int numHistograms) {
+   this.data = data;
+   this.numCounters = numCounters;
+   this.numGauges = numGauges;
+   this.numMeters = numMeters;
+   this.numHistograms = numHistograms;
+   }
+   }
+

//-
// Serialization

//-
public static class MetricDumpSerializer {
-   private ByteArrayOutputStream baos = new 
ByteArrayOutputStream(4096);
-   private DataOutputStream dos = new DataOutputStream(baos);
+   private DataOutputSerializer buffer = new 
DataOutputSerializer(1024 * 32);
--- End diff --

Empty line missing before this line.


> MetricQueryService throws NullPointerException on JobManager
> 
>
> Key: FLINK-5464
> URL: https://issues.apache.org/jira/browse/FLINK-5464
> Project: Flink
>  Issue Type: Bug
>  Components: Webfrontend
>Affects Versions: 1.2.0
>Reporter: Robert Metzger
>Assignee: Chesnay Schepler
>
> I'm using Flink 699f4b0.
> My JobManager log contains many of these log entries:
> {code}
> 2017-01-11 19:42:05,778 WARN  
> org.apache.flink.runtime.webmonitor.metrics.MetricFetcher - Fetching 
> metrics failed.
> akka.pattern.AskTimeoutException: Ask timed out on 
> [Actor[akka://flink/user/MetricQueryService#-970662317]] after [1 ms]
>   at 
> akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:334)
>   at akka.actor.Scheduler$$anon$7.run(Scheduler.scala:117)
>   at 
> scala.concurrent.Future$InternalCallbackExecutor$.scala$concurrent$Future$InternalCallbackExecutor$$unbatchedExecute(Future.scala:694)
>   at 
> scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:691)
>   at 
> akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(Scheduler.scala:474)
>   at 
> akka.actor.LightArrayRevolverScheduler$$anon$8.executeBucket$1(Scheduler.scala:425)
>   at 
> akka.actor.LightArrayRevolverScheduler$$anon$8.nextTick(Scheduler.scala:429)
>   at 
> akka.actor.LightArrayRevolverScheduler$$anon$8.run(Scheduler.scala:381)
>   at java.lang.Thread.run(Thread.java:745)
> 2017-01-11 19:42:07,765 WARN  
> org.apache.flink.runtime.metrics.dump.MetricQueryService  - An exception 
> occurred while processing a message.
> java.lang.NullPointerException
>   at 
> org.apache.flink.runtime.metrics.dump.MetricDumpSerialization.serializeGauge(MetricDumpSerialization.java:162)
>   at 
> org.apache.flink.runtime.metrics.dump.MetricDumpSerialization.access$300(MetricDumpSerialization.java:47)
>   at 
> org.apache.flink.runtime.metrics.dump.MetricDumpSerialization$MetricDumpSerializer.serialize(MetricDumpSerialization.java:90)
>   at 
> org.apache.flink.runtime.metrics.dump.MetricQueryService.onReceive(MetricQueryService.java:109)
>   at 
> akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:167)
>   at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
>   at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:97)
>   at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
>   at akka.actor.ActorCell.invoke(ActorCell.scala:487)
>   at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
>   at akka.dispatch.Mailbox.run(Mailbox.scala:220)
>   at 
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
>   at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>   at 

[jira] [Commented] (FLINK-5464) MetricQueryService throws NullPointerException on JobManager

2017-01-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5464?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15834513#comment-15834513
 ] 

ASF GitHub Bot commented on FLINK-5464:
---

Github user uce commented on a diff in the pull request:

https://github.com/apache/flink/pull/3128#discussion_r97315979
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerialization.java
 ---
@@ -64,122 +80,148 @@ private MetricDumpSerialization() {
 * @param gauges gauges to serialize
 * @param histograms histograms to serialize
 * @return byte array containing the serialized metrics
-* @throws IOException
 */
-   public byte[] serialize(
+   public MetricSerializationResult serialize(
Map> counters,
Map> gauges,
Map> 
histograms,
-   Map> meters) 
throws IOException {
-   
-   baos.reset();
-   dos.writeInt(counters.size());
-   dos.writeInt(gauges.size());
-   dos.writeInt(histograms.size());
-   dos.writeInt(meters.size());
+   Map> meters) {
+
+   buffer.clear();
 
+   int numCounters = 0;
for (Map.Entry> 
entry : counters.entrySet()) {
-   serializeMetricInfo(dos, entry.getValue().f0);
-   serializeString(dos, entry.getValue().f1);
-   serializeCounter(dos, entry.getKey());
+   try {
+   serializeCounter(buffer, 
entry.getValue().f0, entry.getValue().f1, entry.getKey());
+   numCounters++;
+   } catch (Exception e) {
+   LOG.warn("Failed to serialize 
counter.", e);
+   }
}
 
+   int numGauges = 0;
for (Map.Entry> entry : gauges.entrySet()) {
-   serializeMetricInfo(dos, entry.getValue().f0);
-   serializeString(dos, entry.getValue().f1);
-   serializeGauge(dos, entry.getKey());
+   try {
+   serializeGauge(buffer, 
entry.getValue().f0, entry.getValue().f1, entry.getKey());
+   numGauges++;
+   } catch (Exception e) {
+   LOG.warn("Failed to serialize gauge.", 
e);
+   }
}
 
+   int numHistograms = 0;
for (Map.Entry> entry : histograms.entrySet()) {
-   serializeMetricInfo(dos, entry.getValue().f0);
-   serializeString(dos, entry.getValue().f1);
-   serializeHistogram(dos, entry.getKey());
+   try {
+   serializeHistogram(buffer, 
entry.getValue().f0, entry.getValue().f1, entry.getKey());
+   numHistograms++;
+   } catch (Exception e) {
+   LOG.warn("Failed to serialize 
histogram.", e);
+   }
}
 
+   int numMeters = 0;
for (Map.Entry> 
entry : meters.entrySet()) {
-   serializeMetricInfo(dos, entry.getValue().f0);
-   serializeString(dos, entry.getValue().f1);
-   serializeMeter(dos, entry.getKey());
+   try {
+   serializeMeter(buffer, 
entry.getValue().f0, entry.getValue().f1, entry.getKey());
+   numMeters++;
+   } catch (Exception e) {
+   LOG.warn("Failed to serialize meter.", 
e);
+   }
}
-   return baos.toByteArray();
+   return new 
MetricSerializationResult(buffer.getCopyOfBuffer(), 

[jira] [Commented] (FLINK-5464) MetricQueryService throws NullPointerException on JobManager

2017-01-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5464?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15834509#comment-15834509
 ] 

ASF GitHub Bot commented on FLINK-5464:
---

Github user uce commented on a diff in the pull request:

https://github.com/apache/flink/pull/3128#discussion_r97315450
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerialization.java
 ---
@@ -64,122 +80,148 @@ private MetricDumpSerialization() {
 * @param gauges gauges to serialize
 * @param histograms histograms to serialize
 * @return byte array containing the serialized metrics
-* @throws IOException
 */
-   public byte[] serialize(
+   public MetricSerializationResult serialize(
Map> counters,
Map> gauges,
Map> 
histograms,
-   Map> meters) 
throws IOException {
-   
-   baos.reset();
-   dos.writeInt(counters.size());
-   dos.writeInt(gauges.size());
-   dos.writeInt(histograms.size());
-   dos.writeInt(meters.size());
+   Map> meters) {
+
+   buffer.clear();
 
+   int numCounters = 0;
for (Map.Entry> 
entry : counters.entrySet()) {
-   serializeMetricInfo(dos, entry.getValue().f0);
-   serializeString(dos, entry.getValue().f1);
-   serializeCounter(dos, entry.getKey());
+   try {
+   serializeCounter(buffer, 
entry.getValue().f0, entry.getValue().f1, entry.getKey());
+   numCounters++;
+   } catch (Exception e) {
+   LOG.warn("Failed to serialize 
counter.", e);
+   }
}
 
+   int numGauges = 0;
for (Map.Entry> entry : gauges.entrySet()) {
-   serializeMetricInfo(dos, entry.getValue().f0);
-   serializeString(dos, entry.getValue().f1);
-   serializeGauge(dos, entry.getKey());
+   try {
+   serializeGauge(buffer, 
entry.getValue().f0, entry.getValue().f1, entry.getKey());
+   numGauges++;
+   } catch (Exception e) {
+   LOG.warn("Failed to serialize gauge.", 
e);
+   }
}
 
+   int numHistograms = 0;
for (Map.Entry> entry : histograms.entrySet()) {
-   serializeMetricInfo(dos, entry.getValue().f0);
-   serializeString(dos, entry.getValue().f1);
-   serializeHistogram(dos, entry.getKey());
+   try {
+   serializeHistogram(buffer, 
entry.getValue().f0, entry.getValue().f1, entry.getKey());
+   numHistograms++;
+   } catch (Exception e) {
+   LOG.warn("Failed to serialize 
histogram.", e);
+   }
}
 
+   int numMeters = 0;
for (Map.Entry> 
entry : meters.entrySet()) {
-   serializeMetricInfo(dos, entry.getValue().f0);
-   serializeString(dos, entry.getValue().f1);
-   serializeMeter(dos, entry.getKey());
+   try {
+   serializeMeter(buffer, 
entry.getValue().f0, entry.getValue().f1, entry.getKey());
+   numMeters++;
+   } catch (Exception e) {
+   LOG.warn("Failed to serialize meter.", 
e);
+   }
}
-   return baos.toByteArray();
+   return new 
MetricSerializationResult(buffer.getCopyOfBuffer(), 

[jira] [Commented] (FLINK-5464) MetricQueryService throws NullPointerException on JobManager

2017-01-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5464?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15834520#comment-15834520
 ] 

ASF GitHub Bot commented on FLINK-5464:
---

Github user uce commented on a diff in the pull request:

https://github.com/apache/flink/pull/3128#discussion_r97316717
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerialization.java
 ---
@@ -50,12 +51,27 @@
private MetricDumpSerialization() {
}
 
+   public static class MetricSerializationResult {
+   public final byte[] data;
+   public final int numCounters;
+   public final int numGauges;
+   public final int numMeters;
+   public final int numHistograms;
+   
+   public MetricSerializationResult(byte[] data, int numCounters, 
int numGauges, int numMeters, int numHistograms) {
--- End diff --

Let's decrease the visibility of the constructor as much as possible.


> MetricQueryService throws NullPointerException on JobManager
> 
>
> Key: FLINK-5464
> URL: https://issues.apache.org/jira/browse/FLINK-5464
> Project: Flink
>  Issue Type: Bug
>  Components: Webfrontend
>Affects Versions: 1.2.0
>Reporter: Robert Metzger
>Assignee: Chesnay Schepler
>
> I'm using Flink 699f4b0.
> My JobManager log contains many of these log entries:
> {code}
> 2017-01-11 19:42:05,778 WARN  
> org.apache.flink.runtime.webmonitor.metrics.MetricFetcher - Fetching 
> metrics failed.
> akka.pattern.AskTimeoutException: Ask timed out on 
> [Actor[akka://flink/user/MetricQueryService#-970662317]] after [1 ms]
>   at 
> akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:334)
>   at akka.actor.Scheduler$$anon$7.run(Scheduler.scala:117)
>   at 
> scala.concurrent.Future$InternalCallbackExecutor$.scala$concurrent$Future$InternalCallbackExecutor$$unbatchedExecute(Future.scala:694)
>   at 
> scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:691)
>   at 
> akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(Scheduler.scala:474)
>   at 
> akka.actor.LightArrayRevolverScheduler$$anon$8.executeBucket$1(Scheduler.scala:425)
>   at 
> akka.actor.LightArrayRevolverScheduler$$anon$8.nextTick(Scheduler.scala:429)
>   at 
> akka.actor.LightArrayRevolverScheduler$$anon$8.run(Scheduler.scala:381)
>   at java.lang.Thread.run(Thread.java:745)
> 2017-01-11 19:42:07,765 WARN  
> org.apache.flink.runtime.metrics.dump.MetricQueryService  - An exception 
> occurred while processing a message.
> java.lang.NullPointerException
>   at 
> org.apache.flink.runtime.metrics.dump.MetricDumpSerialization.serializeGauge(MetricDumpSerialization.java:162)
>   at 
> org.apache.flink.runtime.metrics.dump.MetricDumpSerialization.access$300(MetricDumpSerialization.java:47)
>   at 
> org.apache.flink.runtime.metrics.dump.MetricDumpSerialization$MetricDumpSerializer.serialize(MetricDumpSerialization.java:90)
>   at 
> org.apache.flink.runtime.metrics.dump.MetricQueryService.onReceive(MetricQueryService.java:109)
>   at 
> akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:167)
>   at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
>   at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:97)
>   at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
>   at akka.actor.ActorCell.invoke(ActorCell.scala:487)
>   at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
>   at akka.dispatch.Mailbox.run(Mailbox.scala:220)
>   at 
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
>   at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>   at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5464) MetricQueryService throws NullPointerException on JobManager

2017-01-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5464?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15834516#comment-15834516
 ] 

ASF GitHub Bot commented on FLINK-5464:
---

Github user uce commented on a diff in the pull request:

https://github.com/apache/flink/pull/3128#discussion_r97317481
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerialization.java
 ---
@@ -191,62 +233,65 @@ private static void serializeMeter(DataOutputStream 
dos, Meter meter) throws IOE
 *
 * @param data serialized metrics
 * @return A list containing the deserialized metrics.
-* @throws IOException
 */
-   public List deserialize(byte[] data) throws 
IOException {
-   ByteArrayInputStream bais = new 
ByteArrayInputStream(data);
-   DataInputStream dis = new DataInputStream(bais);
 
-   int numCounters = dis.readInt();
-   int numGauges = dis.readInt();
-   int numHistograms = dis.readInt();
-   int numMeters = dis.readInt();
+   public List 
deserialize(MetricDumpSerialization.MetricSerializationResult data) {
+   DataInputView in = new DataInputDeserializer(data.data, 
0, data.data.length);
 
-   List metrics = new ArrayList<>(numCounters 
+ numGauges + numHistograms);
+   List metrics = new 
ArrayList<>(data.numCounters + data.numGauges + data.numHistograms + 
data.numMeters);
 
-   for (int x = 0; x < numCounters; x++) {
-   metrics.add(deserializeCounter(dis));
+   for (int x = 0; x < data.numCounters; x++) {
+   try {
+   metrics.add(deserializeCounter(in));
+   } catch (Exception e) {
+   LOG.warn("Failed to deserialize 
counter.", e);
+   }
}
 
-   for (int x = 0; x < numGauges; x++) {
-   metrics.add(deserializeGauge(dis));
+   for (int x = 0; x < data.numGauges; x++) {
+   try {
+   metrics.add(deserializeGauge(in));
+   } catch (Exception e) {
+   LOG.warn("Failed to deserialize 
counter.", e);
+   }
}
 
-   for (int x = 0; x < numHistograms; x++) {
-   metrics.add(deserializeHistogram(dis));
+   for (int x = 0; x < data.numHistograms; x++) {
+   try {
+   metrics.add(deserializeHistogram(in));
+   } catch (Exception e) {
+   LOG.warn("Failed to deserialize 
counter.", e);
+   }
}
 
-   for (int x = 0; x < numMeters; x++) {
-   metrics.add(deserializeMeter(dis));
+   for (int x = 0; x < data.numMeters; x++) {
+   try {
+   metrics.add(deserializeMeter(in));
+   } catch (Exception e) {
+   LOG.warn("Failed to deserialize 
counter.", e);
+   }
}
-
-   return metrics;
}
}
 
-   private static String deserializeString(DataInputStream dis) throws 
IOException {
-   int stringLength = dis.readInt();
-   byte[] bytes = new byte[stringLength];
-   dis.readFully(bytes);
-   return new String(bytes);
-   }
 
-   private static MetricDump.CounterDump 
deserializeCounter(DataInputStream dis) throws IOException {
+   private static MetricDump.CounterDump deserializeCounter(DataInputView 
dis) throws IOException {
QueryScopeInfo scope = deserializeMetricInfo(dis);
-   String name = deserializeString(dis);
-   return new MetricDump.CounterDump(scope, name, dis.readLong());
+   String name = dis.readUTF();
+   long count = dis.readLong();
+   return new MetricDump.CounterDump(scope, name, count);
--- End diff --

Should we add an empty line before the `return`s?


> MetricQueryService throws NullPointerException on JobManager
> 
>
> Key: FLINK-5464
>

[jira] [Commented] (FLINK-5464) MetricQueryService throws NullPointerException on JobManager

2017-01-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5464?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15834519#comment-15834519
 ] 

ASF GitHub Bot commented on FLINK-5464:
---

Github user uce commented on a diff in the pull request:

https://github.com/apache/flink/pull/3128#discussion_r97314949
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerialization.java
 ---
@@ -50,12 +51,27 @@
private MetricDumpSerialization() {
}
 
+   public static class MetricSerializationResult {
--- End diff --

Can you add a short class level comment about why we added this? The 
problem with determining number of metrics before hand etc.


> MetricQueryService throws NullPointerException on JobManager
> 
>
> Key: FLINK-5464
> URL: https://issues.apache.org/jira/browse/FLINK-5464
> Project: Flink
>  Issue Type: Bug
>  Components: Webfrontend
>Affects Versions: 1.2.0
>Reporter: Robert Metzger
>Assignee: Chesnay Schepler
>
> I'm using Flink 699f4b0.
> My JobManager log contains many of these log entries:
> {code}
> 2017-01-11 19:42:05,778 WARN  
> org.apache.flink.runtime.webmonitor.metrics.MetricFetcher - Fetching 
> metrics failed.
> akka.pattern.AskTimeoutException: Ask timed out on 
> [Actor[akka://flink/user/MetricQueryService#-970662317]] after [1 ms]
>   at 
> akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:334)
>   at akka.actor.Scheduler$$anon$7.run(Scheduler.scala:117)
>   at 
> scala.concurrent.Future$InternalCallbackExecutor$.scala$concurrent$Future$InternalCallbackExecutor$$unbatchedExecute(Future.scala:694)
>   at 
> scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:691)
>   at 
> akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(Scheduler.scala:474)
>   at 
> akka.actor.LightArrayRevolverScheduler$$anon$8.executeBucket$1(Scheduler.scala:425)
>   at 
> akka.actor.LightArrayRevolverScheduler$$anon$8.nextTick(Scheduler.scala:429)
>   at 
> akka.actor.LightArrayRevolverScheduler$$anon$8.run(Scheduler.scala:381)
>   at java.lang.Thread.run(Thread.java:745)
> 2017-01-11 19:42:07,765 WARN  
> org.apache.flink.runtime.metrics.dump.MetricQueryService  - An exception 
> occurred while processing a message.
> java.lang.NullPointerException
>   at 
> org.apache.flink.runtime.metrics.dump.MetricDumpSerialization.serializeGauge(MetricDumpSerialization.java:162)
>   at 
> org.apache.flink.runtime.metrics.dump.MetricDumpSerialization.access$300(MetricDumpSerialization.java:47)
>   at 
> org.apache.flink.runtime.metrics.dump.MetricDumpSerialization$MetricDumpSerializer.serialize(MetricDumpSerialization.java:90)
>   at 
> org.apache.flink.runtime.metrics.dump.MetricQueryService.onReceive(MetricQueryService.java:109)
>   at 
> akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:167)
>   at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
>   at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:97)
>   at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
>   at akka.actor.ActorCell.invoke(ActorCell.scala:487)
>   at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
>   at akka.dispatch.Mailbox.run(Mailbox.scala:220)
>   at 
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
>   at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>   at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5464) MetricQueryService throws NullPointerException on JobManager

2017-01-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5464?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15834512#comment-15834512
 ] 

ASF GitHub Bot commented on FLINK-5464:
---

Github user uce commented on a diff in the pull request:

https://github.com/apache/flink/pull/3128#discussion_r97316605
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerialization.java
 ---
@@ -64,122 +80,148 @@ private MetricDumpSerialization() {
 * @param gauges gauges to serialize
 * @param histograms histograms to serialize
 * @return byte array containing the serialized metrics
-* @throws IOException
 */
-   public byte[] serialize(
+   public MetricSerializationResult serialize(
Map> counters,
Map> gauges,
Map> 
histograms,
-   Map> meters) 
throws IOException {
-   
-   baos.reset();
-   dos.writeInt(counters.size());
-   dos.writeInt(gauges.size());
-   dos.writeInt(histograms.size());
-   dos.writeInt(meters.size());
+   Map> meters) {
+
+   buffer.clear();
 
+   int numCounters = 0;
for (Map.Entry> 
entry : counters.entrySet()) {
-   serializeMetricInfo(dos, entry.getValue().f0);
-   serializeString(dos, entry.getValue().f1);
-   serializeCounter(dos, entry.getKey());
+   try {
+   serializeCounter(buffer, 
entry.getValue().f0, entry.getValue().f1, entry.getKey());
+   numCounters++;
+   } catch (Exception e) {
+   LOG.warn("Failed to serialize 
counter.", e);
+   }
}
 
+   int numGauges = 0;
for (Map.Entry> entry : gauges.entrySet()) {
-   serializeMetricInfo(dos, entry.getValue().f0);
-   serializeString(dos, entry.getValue().f1);
-   serializeGauge(dos, entry.getKey());
+   try {
+   serializeGauge(buffer, 
entry.getValue().f0, entry.getValue().f1, entry.getKey());
+   numGauges++;
+   } catch (Exception e) {
+   LOG.warn("Failed to serialize gauge.", 
e);
+   }
}
 
+   int numHistograms = 0;
for (Map.Entry> entry : histograms.entrySet()) {
-   serializeMetricInfo(dos, entry.getValue().f0);
-   serializeString(dos, entry.getValue().f1);
-   serializeHistogram(dos, entry.getKey());
+   try {
+   serializeHistogram(buffer, 
entry.getValue().f0, entry.getValue().f1, entry.getKey());
+   numHistograms++;
+   } catch (Exception e) {
+   LOG.warn("Failed to serialize 
histogram.", e);
+   }
}
 
+   int numMeters = 0;
for (Map.Entry> 
entry : meters.entrySet()) {
-   serializeMetricInfo(dos, entry.getValue().f0);
-   serializeString(dos, entry.getValue().f1);
-   serializeMeter(dos, entry.getKey());
+   try {
+   serializeMeter(buffer, 
entry.getValue().f0, entry.getValue().f1, entry.getKey());
+   numMeters++;
+   } catch (Exception e) {
+   LOG.warn("Failed to serialize meter.", 
e);
--- End diff --

Should we decrease the log level to `debug` (here and the other lines)? The 
user won't be able to act on the warning and in most cases this should work and 
we don't 

[jira] [Commented] (FLINK-5464) MetricQueryService throws NullPointerException on JobManager

2017-01-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5464?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15834515#comment-15834515
 ] 

ASF GitHub Bot commented on FLINK-5464:
---

Github user uce commented on a diff in the pull request:

https://github.com/apache/flink/pull/3128#discussion_r97316951
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerialization.java
 ---
@@ -191,62 +233,65 @@ private static void serializeMeter(DataOutputStream 
dos, Meter meter) throws IOE
 *
 * @param data serialized metrics
 * @return A list containing the deserialized metrics.
-* @throws IOException
 */
-   public List deserialize(byte[] data) throws 
IOException {
-   ByteArrayInputStream bais = new 
ByteArrayInputStream(data);
-   DataInputStream dis = new DataInputStream(bais);
 
-   int numCounters = dis.readInt();
-   int numGauges = dis.readInt();
-   int numHistograms = dis.readInt();
-   int numMeters = dis.readInt();
+   public List 
deserialize(MetricDumpSerialization.MetricSerializationResult data) {
--- End diff --

Furthermore, after that line. And there is an empty line after the JavaDoc 
of `deserialize`.


> MetricQueryService throws NullPointerException on JobManager
> 
>
> Key: FLINK-5464
> URL: https://issues.apache.org/jira/browse/FLINK-5464
> Project: Flink
>  Issue Type: Bug
>  Components: Webfrontend
>Affects Versions: 1.2.0
>Reporter: Robert Metzger
>Assignee: Chesnay Schepler
>
> I'm using Flink 699f4b0.
> My JobManager log contains many of these log entries:
> {code}
> 2017-01-11 19:42:05,778 WARN  
> org.apache.flink.runtime.webmonitor.metrics.MetricFetcher - Fetching 
> metrics failed.
> akka.pattern.AskTimeoutException: Ask timed out on 
> [Actor[akka://flink/user/MetricQueryService#-970662317]] after [1 ms]
>   at 
> akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:334)
>   at akka.actor.Scheduler$$anon$7.run(Scheduler.scala:117)
>   at 
> scala.concurrent.Future$InternalCallbackExecutor$.scala$concurrent$Future$InternalCallbackExecutor$$unbatchedExecute(Future.scala:694)
>   at 
> scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:691)
>   at 
> akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(Scheduler.scala:474)
>   at 
> akka.actor.LightArrayRevolverScheduler$$anon$8.executeBucket$1(Scheduler.scala:425)
>   at 
> akka.actor.LightArrayRevolverScheduler$$anon$8.nextTick(Scheduler.scala:429)
>   at 
> akka.actor.LightArrayRevolverScheduler$$anon$8.run(Scheduler.scala:381)
>   at java.lang.Thread.run(Thread.java:745)
> 2017-01-11 19:42:07,765 WARN  
> org.apache.flink.runtime.metrics.dump.MetricQueryService  - An exception 
> occurred while processing a message.
> java.lang.NullPointerException
>   at 
> org.apache.flink.runtime.metrics.dump.MetricDumpSerialization.serializeGauge(MetricDumpSerialization.java:162)
>   at 
> org.apache.flink.runtime.metrics.dump.MetricDumpSerialization.access$300(MetricDumpSerialization.java:47)
>   at 
> org.apache.flink.runtime.metrics.dump.MetricDumpSerialization$MetricDumpSerializer.serialize(MetricDumpSerialization.java:90)
>   at 
> org.apache.flink.runtime.metrics.dump.MetricQueryService.onReceive(MetricQueryService.java:109)
>   at 
> akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:167)
>   at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
>   at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:97)
>   at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
>   at akka.actor.ActorCell.invoke(ActorCell.scala:487)
>   at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
>   at akka.dispatch.Mailbox.run(Mailbox.scala:220)
>   at 
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
>   at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>   at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5464) MetricQueryService throws NullPointerException on JobManager

2017-01-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5464?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15834518#comment-15834518
 ] 

ASF GitHub Bot commented on FLINK-5464:
---

Github user uce commented on a diff in the pull request:

https://github.com/apache/flink/pull/3128#discussion_r97319035
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerialization.java
 ---
@@ -50,12 +51,27 @@
private MetricDumpSerialization() {
}
 
+   public static class MetricSerializationResult {
--- End diff --

This is not `Serializable` and would fail when send with Akka. The 
following test fails:
```java 
@Test
public void testJavaSerialization() throws IOException {
MetricDumpSerialization.MetricDumpSerializer serializer = new 
MetricDumpSerialization.MetricDumpSerializer();

final ByteArrayOutputStream bos = new ByteArrayOutputStream(1024);
final ObjectOutputStream oos = new ObjectOutputStream(bos);

oos.writeObject(serializer.serialize(
new HashMap>(),
new HashMap>(),
new HashMap>(),
new HashMap>()));
}
```


> MetricQueryService throws NullPointerException on JobManager
> 
>
> Key: FLINK-5464
> URL: https://issues.apache.org/jira/browse/FLINK-5464
> Project: Flink
>  Issue Type: Bug
>  Components: Webfrontend
>Affects Versions: 1.2.0
>Reporter: Robert Metzger
>Assignee: Chesnay Schepler
>
> I'm using Flink 699f4b0.
> My JobManager log contains many of these log entries:
> {code}
> 2017-01-11 19:42:05,778 WARN  
> org.apache.flink.runtime.webmonitor.metrics.MetricFetcher - Fetching 
> metrics failed.
> akka.pattern.AskTimeoutException: Ask timed out on 
> [Actor[akka://flink/user/MetricQueryService#-970662317]] after [1 ms]
>   at 
> akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:334)
>   at akka.actor.Scheduler$$anon$7.run(Scheduler.scala:117)
>   at 
> scala.concurrent.Future$InternalCallbackExecutor$.scala$concurrent$Future$InternalCallbackExecutor$$unbatchedExecute(Future.scala:694)
>   at 
> scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:691)
>   at 
> akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(Scheduler.scala:474)
>   at 
> akka.actor.LightArrayRevolverScheduler$$anon$8.executeBucket$1(Scheduler.scala:425)
>   at 
> akka.actor.LightArrayRevolverScheduler$$anon$8.nextTick(Scheduler.scala:429)
>   at 
> akka.actor.LightArrayRevolverScheduler$$anon$8.run(Scheduler.scala:381)
>   at java.lang.Thread.run(Thread.java:745)
> 2017-01-11 19:42:07,765 WARN  
> org.apache.flink.runtime.metrics.dump.MetricQueryService  - An exception 
> occurred while processing a message.
> java.lang.NullPointerException
>   at 
> org.apache.flink.runtime.metrics.dump.MetricDumpSerialization.serializeGauge(MetricDumpSerialization.java:162)
>   at 
> org.apache.flink.runtime.metrics.dump.MetricDumpSerialization.access$300(MetricDumpSerialization.java:47)
>   at 
> org.apache.flink.runtime.metrics.dump.MetricDumpSerialization$MetricDumpSerializer.serialize(MetricDumpSerialization.java:90)
>   at 
> org.apache.flink.runtime.metrics.dump.MetricQueryService.onReceive(MetricQueryService.java:109)
>   at 
> akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:167)
>   at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
>   at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:97)
>   at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
>   at akka.actor.ActorCell.invoke(ActorCell.scala:487)
>   at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
>   at akka.dispatch.Mailbox.run(Mailbox.scala:220)
>   at 
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
>   at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>   at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5464) MetricQueryService throws NullPointerException on JobManager

2017-01-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5464?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15828361#comment-15828361
 ] 

ASF GitHub Bot commented on FLINK-5464:
---

Github user zentol commented on the issue:

https://github.com/apache/flink/pull/3128
  
I've had an offline chat with @rmetzger and @uce. We agreed that using a 
ByteBuffer and resizing it manually was a bit undesirable.

Instead we opted for the following approach:
* use DataOutputSerializer instead of DataOutputStream; it is a bit more 
efficient of strings, which make up the majority of serialized data, and is 
also backed by a resizing array
* restructure the serialize methods to be symmetric with the deserialize 
methods
* Access the metric values before serializing anything and reduce them to 
primitives or strings. The assumption is that if this succeeds the following 
serialization will succeed; and can only fail due to critical errors that will 
prevent serialization completely or programming errors on our part.


> MetricQueryService throws NullPointerException on JobManager
> 
>
> Key: FLINK-5464
> URL: https://issues.apache.org/jira/browse/FLINK-5464
> Project: Flink
>  Issue Type: Bug
>  Components: Webfrontend
>Affects Versions: 1.2.0
>Reporter: Robert Metzger
>Assignee: Chesnay Schepler
>
> I'm using Flink 699f4b0.
> My JobManager log contains many of these log entries:
> {code}
> 2017-01-11 19:42:05,778 WARN  
> org.apache.flink.runtime.webmonitor.metrics.MetricFetcher - Fetching 
> metrics failed.
> akka.pattern.AskTimeoutException: Ask timed out on 
> [Actor[akka://flink/user/MetricQueryService#-970662317]] after [1 ms]
>   at 
> akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:334)
>   at akka.actor.Scheduler$$anon$7.run(Scheduler.scala:117)
>   at 
> scala.concurrent.Future$InternalCallbackExecutor$.scala$concurrent$Future$InternalCallbackExecutor$$unbatchedExecute(Future.scala:694)
>   at 
> scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:691)
>   at 
> akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(Scheduler.scala:474)
>   at 
> akka.actor.LightArrayRevolverScheduler$$anon$8.executeBucket$1(Scheduler.scala:425)
>   at 
> akka.actor.LightArrayRevolverScheduler$$anon$8.nextTick(Scheduler.scala:429)
>   at 
> akka.actor.LightArrayRevolverScheduler$$anon$8.run(Scheduler.scala:381)
>   at java.lang.Thread.run(Thread.java:745)
> 2017-01-11 19:42:07,765 WARN  
> org.apache.flink.runtime.metrics.dump.MetricQueryService  - An exception 
> occurred while processing a message.
> java.lang.NullPointerException
>   at 
> org.apache.flink.runtime.metrics.dump.MetricDumpSerialization.serializeGauge(MetricDumpSerialization.java:162)
>   at 
> org.apache.flink.runtime.metrics.dump.MetricDumpSerialization.access$300(MetricDumpSerialization.java:47)
>   at 
> org.apache.flink.runtime.metrics.dump.MetricDumpSerialization$MetricDumpSerializer.serialize(MetricDumpSerialization.java:90)
>   at 
> org.apache.flink.runtime.metrics.dump.MetricQueryService.onReceive(MetricQueryService.java:109)
>   at 
> akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:167)
>   at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
>   at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:97)
>   at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
>   at akka.actor.ActorCell.invoke(ActorCell.scala:487)
>   at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
>   at akka.dispatch.Mailbox.run(Mailbox.scala:220)
>   at 
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
>   at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>   at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5464) MetricQueryService throws NullPointerException on JobManager

2017-01-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5464?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15823880#comment-15823880
 ] 

ASF GitHub Bot commented on FLINK-5464:
---

GitHub user zentol opened a pull request:

https://github.com/apache/flink/pull/3128

[FLINK-5464] Improve MetricDumpSerialization error handling

Rework of #3103.

The key change introduced in the previous PR remains; if a gauge returns 
null it is not serialized.

However, I've extended the PR to harden the entire serialization process 
against exceptions. The major gain here is that a single failed serialization 
does no longer destroys the entire dump; instead it is simply omitted.

In order to allow that I had to replace the ```OutputStream```s with a 
```ByteBuffer```. The former doesn't really allow you to handle failures in 
between serialization steps, as you can't reset the stream in any way. The 
```ByteBuffer``` is manually resized if a ```BufferOverflowException``` occurs.

* ```MetricDump(De)Serializer#(de)serialize``` will no longer throw any 
exception but catch and log them instead
* Exceptions during the serialization of a metric will cause that metric to 
be skipped.
* added test for handling of gauge returning null
* added test for manual resizing of backing array

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zentol/flink 5464_mqs_npe

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3128.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3128


commit 8610a47c407afe2140cd4b5651ebc794ef3feec8
Author: zentol 
Date:   2017-01-12T11:41:56Z

[FLINK-5464] [metrics] Ignore metrics that are null

commit 442c0a4dee002b73e5b86d6c7bb274484a8900ac
Author: zentol 
Date:   2017-01-16T10:25:58Z

[hotfix] Remove unused variable in MetricDumpSerializerTest

commit 0f813ebf53414b1b68c6dfe8e3e1dbc896054c36
Author: zentol 
Date:   2017-01-12T11:42:26Z

[FLINK-5464] [metrics] Improve MetricDumpSerialization exception handling




> MetricQueryService throws NullPointerException on JobManager
> 
>
> Key: FLINK-5464
> URL: https://issues.apache.org/jira/browse/FLINK-5464
> Project: Flink
>  Issue Type: Bug
>  Components: Webfrontend
>Affects Versions: 1.2.0
>Reporter: Robert Metzger
>Assignee: Chesnay Schepler
>
> I'm using Flink 699f4b0.
> My JobManager log contains many of these log entries:
> {code}
> 2017-01-11 19:42:05,778 WARN  
> org.apache.flink.runtime.webmonitor.metrics.MetricFetcher - Fetching 
> metrics failed.
> akka.pattern.AskTimeoutException: Ask timed out on 
> [Actor[akka://flink/user/MetricQueryService#-970662317]] after [1 ms]
>   at 
> akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:334)
>   at akka.actor.Scheduler$$anon$7.run(Scheduler.scala:117)
>   at 
> scala.concurrent.Future$InternalCallbackExecutor$.scala$concurrent$Future$InternalCallbackExecutor$$unbatchedExecute(Future.scala:694)
>   at 
> scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:691)
>   at 
> akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(Scheduler.scala:474)
>   at 
> akka.actor.LightArrayRevolverScheduler$$anon$8.executeBucket$1(Scheduler.scala:425)
>   at 
> akka.actor.LightArrayRevolverScheduler$$anon$8.nextTick(Scheduler.scala:429)
>   at 
> akka.actor.LightArrayRevolverScheduler$$anon$8.run(Scheduler.scala:381)
>   at java.lang.Thread.run(Thread.java:745)
> 2017-01-11 19:42:07,765 WARN  
> org.apache.flink.runtime.metrics.dump.MetricQueryService  - An exception 
> occurred while processing a message.
> java.lang.NullPointerException
>   at 
> org.apache.flink.runtime.metrics.dump.MetricDumpSerialization.serializeGauge(MetricDumpSerialization.java:162)
>   at 
> org.apache.flink.runtime.metrics.dump.MetricDumpSerialization.access$300(MetricDumpSerialization.java:47)
>   at 
> org.apache.flink.runtime.metrics.dump.MetricDumpSerialization$MetricDumpSerializer.serialize(MetricDumpSerialization.java:90)
>   at 
> org.apache.flink.runtime.metrics.dump.MetricQueryService.onReceive(MetricQueryService.java:109)
>   at 
> akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:167)
>   at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
>   at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:97)
>   at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
>   at akka.actor.ActorCell.invoke(ActorCell.scala:487)
>   

[jira] [Commented] (FLINK-5464) MetricQueryService throws NullPointerException on JobManager

2017-01-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5464?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15822472#comment-15822472
 ] 

ASF GitHub Bot commented on FLINK-5464:
---

Github user zentol commented on the issue:

https://github.com/apache/flink/pull/3103
  
yeah I'll have to rethink this :/

I wanted to just ignore gauges returning null (since nothing in the 
web-interface accounts for that case), but did not adjust the count of metrics 
that are submitted. urgh.


> MetricQueryService throws NullPointerException on JobManager
> 
>
> Key: FLINK-5464
> URL: https://issues.apache.org/jira/browse/FLINK-5464
> Project: Flink
>  Issue Type: Bug
>  Components: Webfrontend
>Affects Versions: 1.2.0
>Reporter: Robert Metzger
>Assignee: Chesnay Schepler
>
> I'm using Flink 699f4b0.
> My JobManager log contains many of these log entries:
> {code}
> 2017-01-11 19:42:05,778 WARN  
> org.apache.flink.runtime.webmonitor.metrics.MetricFetcher - Fetching 
> metrics failed.
> akka.pattern.AskTimeoutException: Ask timed out on 
> [Actor[akka://flink/user/MetricQueryService#-970662317]] after [1 ms]
>   at 
> akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:334)
>   at akka.actor.Scheduler$$anon$7.run(Scheduler.scala:117)
>   at 
> scala.concurrent.Future$InternalCallbackExecutor$.scala$concurrent$Future$InternalCallbackExecutor$$unbatchedExecute(Future.scala:694)
>   at 
> scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:691)
>   at 
> akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(Scheduler.scala:474)
>   at 
> akka.actor.LightArrayRevolverScheduler$$anon$8.executeBucket$1(Scheduler.scala:425)
>   at 
> akka.actor.LightArrayRevolverScheduler$$anon$8.nextTick(Scheduler.scala:429)
>   at 
> akka.actor.LightArrayRevolverScheduler$$anon$8.run(Scheduler.scala:381)
>   at java.lang.Thread.run(Thread.java:745)
> 2017-01-11 19:42:07,765 WARN  
> org.apache.flink.runtime.metrics.dump.MetricQueryService  - An exception 
> occurred while processing a message.
> java.lang.NullPointerException
>   at 
> org.apache.flink.runtime.metrics.dump.MetricDumpSerialization.serializeGauge(MetricDumpSerialization.java:162)
>   at 
> org.apache.flink.runtime.metrics.dump.MetricDumpSerialization.access$300(MetricDumpSerialization.java:47)
>   at 
> org.apache.flink.runtime.metrics.dump.MetricDumpSerialization$MetricDumpSerializer.serialize(MetricDumpSerialization.java:90)
>   at 
> org.apache.flink.runtime.metrics.dump.MetricQueryService.onReceive(MetricQueryService.java:109)
>   at 
> akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:167)
>   at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
>   at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:97)
>   at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
>   at akka.actor.ActorCell.invoke(ActorCell.scala:487)
>   at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
>   at akka.dispatch.Mailbox.run(Mailbox.scala:220)
>   at 
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
>   at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>   at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5464) MetricQueryService throws NullPointerException on JobManager

2017-01-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5464?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15822473#comment-15822473
 ] 

ASF GitHub Bot commented on FLINK-5464:
---

Github user zentol closed the pull request at:

https://github.com/apache/flink/pull/3103


> MetricQueryService throws NullPointerException on JobManager
> 
>
> Key: FLINK-5464
> URL: https://issues.apache.org/jira/browse/FLINK-5464
> Project: Flink
>  Issue Type: Bug
>  Components: Webfrontend
>Affects Versions: 1.2.0
>Reporter: Robert Metzger
>Assignee: Chesnay Schepler
>
> I'm using Flink 699f4b0.
> My JobManager log contains many of these log entries:
> {code}
> 2017-01-11 19:42:05,778 WARN  
> org.apache.flink.runtime.webmonitor.metrics.MetricFetcher - Fetching 
> metrics failed.
> akka.pattern.AskTimeoutException: Ask timed out on 
> [Actor[akka://flink/user/MetricQueryService#-970662317]] after [1 ms]
>   at 
> akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:334)
>   at akka.actor.Scheduler$$anon$7.run(Scheduler.scala:117)
>   at 
> scala.concurrent.Future$InternalCallbackExecutor$.scala$concurrent$Future$InternalCallbackExecutor$$unbatchedExecute(Future.scala:694)
>   at 
> scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:691)
>   at 
> akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(Scheduler.scala:474)
>   at 
> akka.actor.LightArrayRevolverScheduler$$anon$8.executeBucket$1(Scheduler.scala:425)
>   at 
> akka.actor.LightArrayRevolverScheduler$$anon$8.nextTick(Scheduler.scala:429)
>   at 
> akka.actor.LightArrayRevolverScheduler$$anon$8.run(Scheduler.scala:381)
>   at java.lang.Thread.run(Thread.java:745)
> 2017-01-11 19:42:07,765 WARN  
> org.apache.flink.runtime.metrics.dump.MetricQueryService  - An exception 
> occurred while processing a message.
> java.lang.NullPointerException
>   at 
> org.apache.flink.runtime.metrics.dump.MetricDumpSerialization.serializeGauge(MetricDumpSerialization.java:162)
>   at 
> org.apache.flink.runtime.metrics.dump.MetricDumpSerialization.access$300(MetricDumpSerialization.java:47)
>   at 
> org.apache.flink.runtime.metrics.dump.MetricDumpSerialization$MetricDumpSerializer.serialize(MetricDumpSerialization.java:90)
>   at 
> org.apache.flink.runtime.metrics.dump.MetricQueryService.onReceive(MetricQueryService.java:109)
>   at 
> akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:167)
>   at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
>   at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:97)
>   at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
>   at akka.actor.ActorCell.invoke(ActorCell.scala:487)
>   at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
>   at akka.dispatch.Mailbox.run(Mailbox.scala:220)
>   at 
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
>   at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>   at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5464) MetricQueryService throws NullPointerException on JobManager

2017-01-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5464?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15821920#comment-15821920
 ] 

ASF GitHub Bot commented on FLINK-5464:
---

Github user rmetzger commented on the issue:

https://github.com/apache/flink/pull/3103
  
While testing this PR, I found that the jobmanager.log is now full with 
exceptions like this one:

```
java.io.EOFException
at java.io.DataInputStream.readInt(DataInputStream.java:392)
at 
org.apache.flink.runtime.metrics.dump.MetricDumpSerialization.deserializeString(MetricDumpSerialization.java:230)
at 
org.apache.flink.runtime.metrics.dump.MetricDumpSerialization.deserializeMetricInfo(MetricDumpSerialization.java:278)
at 
org.apache.flink.runtime.metrics.dump.MetricDumpSerialization.deserializeGauge(MetricDumpSerialization.java:243)
at 
org.apache.flink.runtime.metrics.dump.MetricDumpSerialization.access$800(MetricDumpSerialization.java:47)
at 
org.apache.flink.runtime.metrics.dump.MetricDumpSerialization$MetricDumpDeserializer.deserialize(MetricDumpSerialization.java:214)
at 
org.apache.flink.runtime.webmonitor.metrics.MetricFetcher.addMetrics(MetricFetcher.java:196)
at 
org.apache.flink.runtime.webmonitor.metrics.MetricFetcher.access$500(MetricFetcher.java:58)
at 
org.apache.flink.runtime.webmonitor.metrics.MetricFetcher$4.onSuccess(MetricFetcher.java:188)
at akka.dispatch.OnSuccess.internal(Future.scala:212)
at akka.dispatch.japi$CallbackBridge.apply(Future.scala:175)
at akka.dispatch.japi$CallbackBridge.apply(Future.scala:172)
at 
scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
at 
scala.runtime.AbstractPartialFunction.applyOrElse(AbstractPartialFunction.scala:25)
at 
scala.concurrent.Future$$anonfun$onSuccess$1.apply(Future.scala:117)
at 
scala.concurrent.Future$$anonfun$onSuccess$1.apply(Future.scala:115)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
at 
java.util.concurrent.ForkJoinTask$AdaptedRunnable.exec(ForkJoinTask.java:1265)
at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:334)
at 
java.util.concurrent.ForkJoinWorkerThread.execTask(ForkJoinWorkerThread.java:604)
at java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:784)
at java.util.concurrent.ForkJoinPool.work(ForkJoinPool.java:646)
at 
java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:398)
```



> MetricQueryService throws NullPointerException on JobManager
> 
>
> Key: FLINK-5464
> URL: https://issues.apache.org/jira/browse/FLINK-5464
> Project: Flink
>  Issue Type: Bug
>  Components: Webfrontend
>Affects Versions: 1.2.0
>Reporter: Robert Metzger
>Assignee: Chesnay Schepler
>
> I'm using Flink 699f4b0.
> My JobManager log contains many of these log entries:
> {code}
> 2017-01-11 19:42:05,778 WARN  
> org.apache.flink.runtime.webmonitor.metrics.MetricFetcher - Fetching 
> metrics failed.
> akka.pattern.AskTimeoutException: Ask timed out on 
> [Actor[akka://flink/user/MetricQueryService#-970662317]] after [1 ms]
>   at 
> akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:334)
>   at akka.actor.Scheduler$$anon$7.run(Scheduler.scala:117)
>   at 
> scala.concurrent.Future$InternalCallbackExecutor$.scala$concurrent$Future$InternalCallbackExecutor$$unbatchedExecute(Future.scala:694)
>   at 
> scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:691)
>   at 
> akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(Scheduler.scala:474)
>   at 
> akka.actor.LightArrayRevolverScheduler$$anon$8.executeBucket$1(Scheduler.scala:425)
>   at 
> akka.actor.LightArrayRevolverScheduler$$anon$8.nextTick(Scheduler.scala:429)
>   at 
> akka.actor.LightArrayRevolverScheduler$$anon$8.run(Scheduler.scala:381)
>   at java.lang.Thread.run(Thread.java:745)
> 2017-01-11 19:42:07,765 WARN  
> org.apache.flink.runtime.metrics.dump.MetricQueryService  - An exception 
> occurred while processing a message.
> java.lang.NullPointerException
>   at 
> org.apache.flink.runtime.metrics.dump.MetricDumpSerialization.serializeGauge(MetricDumpSerialization.java:162)
>   at 
> org.apache.flink.runtime.metrics.dump.MetricDumpSerialization.access$300(MetricDumpSerialization.java:47)
>   at 
> org.apache.flink.runtime.metrics.dump.MetricDumpSerialization$MetricDumpSerializer.serialize(MetricDumpSerialization.java:90)
>   at 
> org.apache.flink.runtime.metrics.dump.MetricQueryService.onReceive(MetricQueryService.java:109)
>   at 
> 

[jira] [Commented] (FLINK-5464) MetricQueryService throws NullPointerException on JobManager

2017-01-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5464?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15821684#comment-15821684
 ] 

ASF GitHub Bot commented on FLINK-5464:
---

Github user zentol commented on the issue:

https://github.com/apache/flink/pull/3103
  
cc @rmetzger 


> MetricQueryService throws NullPointerException on JobManager
> 
>
> Key: FLINK-5464
> URL: https://issues.apache.org/jira/browse/FLINK-5464
> Project: Flink
>  Issue Type: Bug
>  Components: Webfrontend
>Affects Versions: 1.2.0
>Reporter: Robert Metzger
>Assignee: Chesnay Schepler
>
> I'm using Flink 699f4b0.
> My JobManager log contains many of these log entries:
> {code}
> 2017-01-11 19:42:05,778 WARN  
> org.apache.flink.runtime.webmonitor.metrics.MetricFetcher - Fetching 
> metrics failed.
> akka.pattern.AskTimeoutException: Ask timed out on 
> [Actor[akka://flink/user/MetricQueryService#-970662317]] after [1 ms]
>   at 
> akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:334)
>   at akka.actor.Scheduler$$anon$7.run(Scheduler.scala:117)
>   at 
> scala.concurrent.Future$InternalCallbackExecutor$.scala$concurrent$Future$InternalCallbackExecutor$$unbatchedExecute(Future.scala:694)
>   at 
> scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:691)
>   at 
> akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(Scheduler.scala:474)
>   at 
> akka.actor.LightArrayRevolverScheduler$$anon$8.executeBucket$1(Scheduler.scala:425)
>   at 
> akka.actor.LightArrayRevolverScheduler$$anon$8.nextTick(Scheduler.scala:429)
>   at 
> akka.actor.LightArrayRevolverScheduler$$anon$8.run(Scheduler.scala:381)
>   at java.lang.Thread.run(Thread.java:745)
> 2017-01-11 19:42:07,765 WARN  
> org.apache.flink.runtime.metrics.dump.MetricQueryService  - An exception 
> occurred while processing a message.
> java.lang.NullPointerException
>   at 
> org.apache.flink.runtime.metrics.dump.MetricDumpSerialization.serializeGauge(MetricDumpSerialization.java:162)
>   at 
> org.apache.flink.runtime.metrics.dump.MetricDumpSerialization.access$300(MetricDumpSerialization.java:47)
>   at 
> org.apache.flink.runtime.metrics.dump.MetricDumpSerialization$MetricDumpSerializer.serialize(MetricDumpSerialization.java:90)
>   at 
> org.apache.flink.runtime.metrics.dump.MetricQueryService.onReceive(MetricQueryService.java:109)
>   at 
> akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:167)
>   at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
>   at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:97)
>   at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
>   at akka.actor.ActorCell.invoke(ActorCell.scala:487)
>   at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
>   at akka.dispatch.Mailbox.run(Mailbox.scala:220)
>   at 
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
>   at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>   at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5464) MetricQueryService throws NullPointerException on JobManager

2017-01-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5464?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15820823#comment-15820823
 ] 

ASF GitHub Bot commented on FLINK-5464:
---

GitHub user zentol opened a pull request:

https://github.com/apache/flink/pull/3103

[FLINK-5464] [metrics] Prevent some NPEs

This PR prevents some NullPointerExceptions from occurring in the metric 
system.

- When registering a metric that is null the metric is ignored, and a 
warning is logged.
  - i.e ```group.counter("counter", null);```
- The MetricDumpSerialization completely ignores gauges if their value is 
null.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zentol/flink 5464_mqs_npe

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3103.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3103


commit 0912848b3ce54842fc6810aa0b041db5547ac690
Author: zentol 
Date:   2017-01-12T11:41:56Z

[FLINK-5464] [metrics] Ignore metrics that are null

commit 941c83a599221fc57c02605e2c3bc348d70aa8b2
Author: zentol 
Date:   2017-01-12T11:42:26Z

[FLINK-5464] [metrics] Prevent Gauge NPE in serialization




> MetricQueryService throws NullPointerException on JobManager
> 
>
> Key: FLINK-5464
> URL: https://issues.apache.org/jira/browse/FLINK-5464
> Project: Flink
>  Issue Type: Bug
>  Components: Webfrontend
>Affects Versions: 1.2.0
>Reporter: Robert Metzger
>Assignee: Chesnay Schepler
>
> I'm using Flink 699f4b0.
> My JobManager log contains many of these log entries:
> {code}
> 2017-01-11 19:42:05,778 WARN  
> org.apache.flink.runtime.webmonitor.metrics.MetricFetcher - Fetching 
> metrics failed.
> akka.pattern.AskTimeoutException: Ask timed out on 
> [Actor[akka://flink/user/MetricQueryService#-970662317]] after [1 ms]
>   at 
> akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:334)
>   at akka.actor.Scheduler$$anon$7.run(Scheduler.scala:117)
>   at 
> scala.concurrent.Future$InternalCallbackExecutor$.scala$concurrent$Future$InternalCallbackExecutor$$unbatchedExecute(Future.scala:694)
>   at 
> scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:691)
>   at 
> akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(Scheduler.scala:474)
>   at 
> akka.actor.LightArrayRevolverScheduler$$anon$8.executeBucket$1(Scheduler.scala:425)
>   at 
> akka.actor.LightArrayRevolverScheduler$$anon$8.nextTick(Scheduler.scala:429)
>   at 
> akka.actor.LightArrayRevolverScheduler$$anon$8.run(Scheduler.scala:381)
>   at java.lang.Thread.run(Thread.java:745)
> 2017-01-11 19:42:07,765 WARN  
> org.apache.flink.runtime.metrics.dump.MetricQueryService  - An exception 
> occurred while processing a message.
> java.lang.NullPointerException
>   at 
> org.apache.flink.runtime.metrics.dump.MetricDumpSerialization.serializeGauge(MetricDumpSerialization.java:162)
>   at 
> org.apache.flink.runtime.metrics.dump.MetricDumpSerialization.access$300(MetricDumpSerialization.java:47)
>   at 
> org.apache.flink.runtime.metrics.dump.MetricDumpSerialization$MetricDumpSerializer.serialize(MetricDumpSerialization.java:90)
>   at 
> org.apache.flink.runtime.metrics.dump.MetricQueryService.onReceive(MetricQueryService.java:109)
>   at 
> akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:167)
>   at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
>   at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:97)
>   at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
>   at akka.actor.ActorCell.invoke(ActorCell.scala:487)
>   at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
>   at akka.dispatch.Mailbox.run(Mailbox.scala:220)
>   at 
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
>   at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>   at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5464) MetricQueryService throws NullPointerException on JobManager

2017-01-12 Thread Chesnay Schepler (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5464?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15820801#comment-15820801
 ] 

Chesnay Schepler commented on FLINK-5464:
-

There are 2 possible cases for this: Either the supplied gauge was null, or it 
was not null but the value it supplies is null. Neither of these cases are 
checked at the moment.

> MetricQueryService throws NullPointerException on JobManager
> 
>
> Key: FLINK-5464
> URL: https://issues.apache.org/jira/browse/FLINK-5464
> Project: Flink
>  Issue Type: Bug
>  Components: Webfrontend
>Affects Versions: 1.2.0
>Reporter: Robert Metzger
>Assignee: Chesnay Schepler
>
> I'm using Flink 699f4b0.
> My JobManager log contains many of these log entries:
> {code}
> 2017-01-11 19:42:05,778 WARN  
> org.apache.flink.runtime.webmonitor.metrics.MetricFetcher - Fetching 
> metrics failed.
> akka.pattern.AskTimeoutException: Ask timed out on 
> [Actor[akka://flink/user/MetricQueryService#-970662317]] after [1 ms]
>   at 
> akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:334)
>   at akka.actor.Scheduler$$anon$7.run(Scheduler.scala:117)
>   at 
> scala.concurrent.Future$InternalCallbackExecutor$.scala$concurrent$Future$InternalCallbackExecutor$$unbatchedExecute(Future.scala:694)
>   at 
> scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:691)
>   at 
> akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(Scheduler.scala:474)
>   at 
> akka.actor.LightArrayRevolverScheduler$$anon$8.executeBucket$1(Scheduler.scala:425)
>   at 
> akka.actor.LightArrayRevolverScheduler$$anon$8.nextTick(Scheduler.scala:429)
>   at 
> akka.actor.LightArrayRevolverScheduler$$anon$8.run(Scheduler.scala:381)
>   at java.lang.Thread.run(Thread.java:745)
> 2017-01-11 19:42:07,765 WARN  
> org.apache.flink.runtime.metrics.dump.MetricQueryService  - An exception 
> occurred while processing a message.
> java.lang.NullPointerException
>   at 
> org.apache.flink.runtime.metrics.dump.MetricDumpSerialization.serializeGauge(MetricDumpSerialization.java:162)
>   at 
> org.apache.flink.runtime.metrics.dump.MetricDumpSerialization.access$300(MetricDumpSerialization.java:47)
>   at 
> org.apache.flink.runtime.metrics.dump.MetricDumpSerialization$MetricDumpSerializer.serialize(MetricDumpSerialization.java:90)
>   at 
> org.apache.flink.runtime.metrics.dump.MetricQueryService.onReceive(MetricQueryService.java:109)
>   at 
> akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:167)
>   at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
>   at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:97)
>   at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
>   at akka.actor.ActorCell.invoke(ActorCell.scala:487)
>   at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
>   at akka.dispatch.Mailbox.run(Mailbox.scala:220)
>   at 
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
>   at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>   at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)