flink git commit: [FLINK-2972] [JavaAPI] Remove Chill dependency from flink-java.

2015-11-30 Thread fhueske
Repository: flink
Updated Branches:
  refs/heads/master d5a6b13ab -> cd899f3be


[FLINK-2972] [JavaAPI] Remove Chill dependency from flink-java.

This closes #1331


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/cd899f3b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/cd899f3b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/cd899f3b

Branch: refs/heads/master
Commit: cd899f3be39c5f052a2a3e9077d1a7df1d47
Parents: d5a6b13
Author: Fabian Hueske 
Authored: Tue Nov 3 16:57:49 2015 +0100
Committer: Fabian Hueske 
Committed: Mon Nov 30 20:31:42 2015 +0100

--
 flink-core/pom.xml  |  6 -
 flink-java/pom.xml  | 17 +++--
 .../typeutils/runtime/kryo/KryoSerializer.java  | 25 ++--
 flink-runtime/pom.xml   | 12 ++
 flink-scala/pom.xml |  2 +-
 5 files changed, 46 insertions(+), 16 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/cd899f3b/flink-core/pom.xml
--
diff --git a/flink-core/pom.xml b/flink-core/pom.xml
index 6763011..f91608e 100644
--- a/flink-core/pom.xml
+++ b/flink-core/pom.xml
@@ -53,12 +53,6 @@ under the License.

 

-   com.twitter
-   chill_${scala.binary.version}
-   ${chill.version}
-   
-
-   
com.google.guava
guava
${guava.version}

http://git-wip-us.apache.org/repos/asf/flink/blob/cd899f3b/flink-java/pom.xml
--
diff --git a/flink-java/pom.xml b/flink-java/pom.xml
index 4b8e24c..0b33de1 100644
--- a/flink-java/pom.xml
+++ b/flink-java/pom.xml
@@ -65,13 +65,7 @@ under the License.
 

com.twitter
-   chill_${scala.binary.version}
-   ${chill.version}
-   
-
-   
-   com.twitter
-   
chill-avro_${scala.binary.version}
+   chill-java
${chill.version}

 
@@ -110,6 +104,15 @@ under the License.
test-jar
test

+
+   
+   com.twitter
+   chill_${scala.binary.version}
+   ${chill.version}
+   
+   test
+   
+   

 


http://git-wip-us.apache.org/repos/asf/flink/blob/cd899f3b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java
--
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java
index 8549e26..b90901c 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java
@@ -26,7 +26,6 @@ import com.esotericsoftware.kryo.io.Input;
 import com.esotericsoftware.kryo.io.Output;
 import com.esotericsoftware.kryo.serializers.JavaSerializer;
 import com.google.common.base.Preconditions;
-import com.twitter.chill.ScalaKryoInstantiator;
 
 import org.apache.avro.generic.GenericData;
 import org.apache.flink.api.common.ExecutionConfig;
@@ -42,6 +41,8 @@ import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.EOFException;
 import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
 import java.lang.reflect.Modifier;
 import java.util.LinkedHashMap;
 import java.util.LinkedHashSet;
@@ -281,9 +282,29 @@ public class KryoSerializer extends TypeSerializer {
 
// 

 
+   private Kryo getKryoInstance() {
+
+   try {
+   // check if ScalaKryoInstantiator is in class path 
(coming from Twitter's Chill library).
+   // This will be true if Flink's Scala API is used.
+   Class chillInstantiatorClazz = 
Class.forName("com.twitter.chill.ScalaKryoInstantiator");
+   Object chillInstantiator = 
chillInstantiatorClazz.newInstance();
+
+   // obtain a Kryo instance through Twitter 

flink git commit: [FLINK-3061] Properly fail Kafka Consumer if broker is not available

2015-11-30 Thread rmetzger
Repository: flink
Updated Branches:
  refs/heads/master 3e9d33ee5 -> 209ae6c91


[FLINK-3061] Properly fail Kafka Consumer if broker is not available

This closes #1395


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/209ae6c9
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/209ae6c9
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/209ae6c9

Branch: refs/heads/master
Commit: 209ae6c916e1bff7126b074dfe831bbc7b113e4a
Parents: 3e9d33e
Author: Robert Metzger 
Authored: Mon Nov 23 17:57:26 2015 +0100
Committer: Robert Metzger 
Committed: Mon Nov 30 16:04:36 2015 +0100

--
 .../connectors/kafka/FlinkKafkaConsumer.java| 11 +++-
 .../connectors/kafka/KafkaConsumerTestBase.java | 29 
 .../streaming/connectors/kafka/KafkaITCase.java |  5 
 3 files changed, 44 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/209ae6c9/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer.java
--
diff --git 
a/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer.java
 
b/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer.java
index e42faef..2d1d91a 100644
--- 
a/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer.java
+++ 
b/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer.java
@@ -306,6 +306,11 @@ public class FlinkKafkaConsumer extends 
RichParallelSourceFunction
// Connect to a broker to get the partitions
List partitionInfos = 
getPartitionsForTopic(topic, props);
 
+   if (partitionInfos.size() == 0) {
+   throw new RuntimeException("Unable to retrieve any 
partitions for topic " + topic + "." +
+   "Please check previous log entries");
+   }
+
// get initial partitions list. The order of the partitions is 
important for consistent 
// partition id assignment in restart cases.
this.partitions = new int[partitionInfos.size()];
@@ -424,7 +429,11 @@ public class FlinkKafkaConsumer extends 
RichParallelSourceFunction
} finally {
if (offsetCommitter != null) {
offsetCommitter.close();
-   offsetCommitter.join();
+   try {
+   offsetCommitter.join();
+   } catch(InterruptedException ie) {
+   // ignore interrupt
+   }
}
}
}

http://git-wip-us.apache.org/repos/asf/flink/blob/209ae6c9/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
--
diff --git 
a/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
 
b/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
index 48f4c50..2116c01 100644
--- 
a/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
+++ 
b/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
@@ -123,6 +123,35 @@ public abstract class KafkaConsumerTestBase extends 
KafkaTestBase {
//  select which tests to run.
// 

 
+
+   /**
+* Test that ensures the KafkaConsumer is properly failing if the topic 
doesnt exist
+* and a wrong broker was specified
+*
+* @throws Exception
+*/
+   public void runFailOnNoBrokerTest() throws Exception {
+   try {
+   Properties properties = new Properties();
+
+   StreamExecutionEnvironment see = 
StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
+   see.getConfig().disableSysoutLogging();
+ 

flink git commit: [FLINK-3088] [serialization] Fix copy method of TypeSerializer which use Kryo

2015-11-30 Thread trohrmann
Repository: flink
Updated Branches:
  refs/heads/master 209ae6c91 -> 20ebad048


[FLINK-3088] [serialization] Fix copy method of TypeSerializer which use Kryo

Some TypeSerializer, WritableSerializer, ValueSerializer, and AvroSerializer, 
and
comparators, WritableComparator and ValueComparator, use Kryo to copy records.
In case where the Kryo serializer cannot copy the record, the copy method fails.
This is however not necessary, because one can copy the element by serializing
the record to a byte array and deserializing it from this array. This PR adds
this behaviour to the respective classes.

Adds KryoUtils tool with copy method to avoid code duplication

This closes #1415.

Adds comments to KryoUtils functions


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/20ebad04
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/20ebad04
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/20ebad04

Branch: refs/heads/master
Commit: 20ebad048d3b4621a6e801a4d24a15d4468f2ba6
Parents: 209ae6c
Author: Till Rohrmann 
Authored: Fri Nov 27 15:14:15 2015 +0100
Committer: Till Rohrmann 
Committed: Mon Nov 30 16:29:22 2015 +0100

--
 .../apache/flink/util/InstantiationUtil.java| 11 ++-
 .../java/typeutils/runtime/AvroSerializer.java  | 30 ++-
 .../api/java/typeutils/runtime/KryoUtils.java   | 87 
 .../java/typeutils/runtime/ValueComparator.java |  9 +-
 .../java/typeutils/runtime/ValueSerializer.java | 12 ++-
 .../typeutils/runtime/WritableComparator.java   |  9 +-
 .../typeutils/runtime/WritableSerializer.java   | 17 +++-
 .../runtime/ValueComparatorUUIDTest.java| 46 +++
 .../api/java/typeutils/runtime/ValueID.java | 72 
 .../runtime/ValueSerializerUUIDTest.java| 50 +++
 .../runtime/WritableComparatorUUIDTest.java | 46 +++
 .../api/java/typeutils/runtime/WritableID.java  | 78 ++
 .../runtime/WritableSerializerTest.java |  1 -
 .../runtime/WritableSerializerUUIDTest.java | 50 +++
 14 files changed, 487 insertions(+), 31 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/20ebad04/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java
--
diff --git 
a/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java 
b/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java
index 8ce3e85..4f6ee32 100644
--- a/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java
+++ b/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java
@@ -279,9 +279,16 @@ public final class InstantiationUtil {
}
 
InputViewDataInputStreamWrapper inputViewWrapper = new 
InputViewDataInputStreamWrapper(new DataInputStream(new 
ByteArrayInputStream(buf)));
+   return serializer.deserialize(inputViewWrapper);
+   }
+
+   public static  T deserializeFromByteArray(TypeSerializer 
serializer, T reuse, byte[] buf) throws IOException {
+   if (buf == null) {
+   throw new NullPointerException("Byte array to 
deserialize from must not be null.");
+   }
 
-   T record = serializer.createInstance();
-   return serializer.deserialize(record, inputViewWrapper);
+   InputViewDataInputStreamWrapper inputViewWrapper = new 
InputViewDataInputStreamWrapper(new DataInputStream(new 
ByteArrayInputStream(buf)));
+   return serializer.deserialize(reuse, inputViewWrapper);
}

@SuppressWarnings("unchecked")

http://git-wip-us.apache.org/repos/asf/flink/blob/20ebad04/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/AvroSerializer.java
--
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/AvroSerializer.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/AvroSerializer.java
index 26bf4ce..bc04367 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/AvroSerializer.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/AvroSerializer.java
@@ -18,13 +18,8 @@
 
 package org.apache.flink.api.java.typeutils.runtime;
 
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 
-import com.esotericsoftware.kryo.KryoException;
-import com.esotericsoftware.kryo.io.Input;
-import com.esotericsoftware.kryo.io.Output;
 import com.google.common.base.Preconditions;
 import org.apache.avro.generic.GenericData;
 import 

flink git commit: [tests] Activate ChaosMonkeyITCase

2015-11-30 Thread trohrmann
Repository: flink
Updated Branches:
  refs/heads/master 20ebad048 -> 2b358cde0


[tests] Activate ChaosMonkeyITCase


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2b358cde
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2b358cde
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2b358cde

Branch: refs/heads/master
Commit: 2b358cde0d232a3bfbc59fa9a6845b0c754f1704
Parents: 20ebad0
Author: Till Rohrmann 
Authored: Wed Nov 25 13:05:22 2015 +0100
Committer: Till Rohrmann 
Committed: Mon Nov 30 16:34:26 2015 +0100

--
 .../org/apache/flink/test/recovery/ChaosMonkeyITCase.java   | 9 -
 1 file changed, 4 insertions(+), 5 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/2b358cde/flink-tests/src/test/java/org/apache/flink/test/recovery/ChaosMonkeyITCase.java
--
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/recovery/ChaosMonkeyITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/recovery/ChaosMonkeyITCase.java
index f536418..3ec8380 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/recovery/ChaosMonkeyITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/recovery/ChaosMonkeyITCase.java
@@ -69,7 +69,6 @@ import static 
com.google.common.base.Preconditions.checkArgument;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.fail;
 
-@Ignore
 public class ChaosMonkeyITCase {
 
private static final Logger LOG = 
LoggerFactory.getLogger(ChaosMonkeyITCase.class);
@@ -135,19 +134,19 @@ public class ChaosMonkeyITCase {
// will be killed. On recovery (which takes some time to bring 
up the new process etc.),
// this test will wait for task managers to reconnect before 
starting the next count down.
// Therefore the delay between retries is not important in this 
setup.
-   final FiniteDuration killEvery = new FiniteDuration(30, 
TimeUnit.SECONDS);
+   final FiniteDuration killEvery = new FiniteDuration(5, 
TimeUnit.SECONDS);
 
// Trigger a checkpoint every
-   final int checkpointingIntervalMs = 2000;
+   final int checkpointingIntervalMs = 1000;
 
// Total number of kills
-   final int totalNumberOfKills = 5;
+   final int totalNumberOfKills = 10;
 
// 
-
 
// Setup
Configuration config = 
ZooKeeperTestUtils.createZooKeeperRecoveryModeConfig(
-   ZooKeeper.getConnectString(), 
FileStateBackendBasePath.getPath());
+   ZooKeeper.getConnectString(), 
FileStateBackendBasePath.toURI().toString());
 
// Akka and restart timeouts
config.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_INTERVAL, 
"1000 ms");



[2/7] flink git commit: [FLINK-3056] [web-dashboard] Represent bytes in more readable form.

2015-11-30 Thread sewen
http://git-wip-us.apache.org/repos/asf/flink/blob/80512229/flink-runtime-web/web-dashboard/web/js/index.js
--
diff --git a/flink-runtime-web/web-dashboard/web/js/index.js 
b/flink-runtime-web/web-dashboard/web/js/index.js
index 5947de0..271f82e 100644
--- a/flink-runtime-web/web-dashboard/web/js/index.js
+++ b/flink-runtime-web/web-dashboard/web/js/index.js
@@ -271,18 +271,29 @@ 
angular.module('flinkApp').filter("amDurationFormatExtended", ["angularMomentCon
   return '';
 }
   };
-}).filter("bytes", function() {
-  return function(bytes, precision) {
-var number, units;
-if (isNaN(parseFloat(bytes)) || !isFinite(bytes)) {
-  return "-";
+}).filter("humanizeBytes", function() {
+  return function(bytes) {
+var converter, units;
+units = ["B", "KB", "MB", "GB", "TB", "PB", "EB"];
+converter = function(value, power) {
+  var base;
+  base = Math.pow(1024, power);
+  if (value < base) {
+return (value / base).toFixed(2) + " " + units[power];
+  } else if (value < base * 1000) {
+return (value / base).toPrecision(3) + " " + units[power];
+  } else {
+return converter(value, power + 1);
+  }
+};
+if (typeof bytes === "undefined" || bytes === null) {
+  return "";
 }
-if (typeof precision === "undefined") {
-  precision = 1;
+if (bytes < 1000) {
+  return bytes + " B";
+} else {
+  return converter(bytes, 1);
 }
-units = ["bytes", "kB", "MB", "GB", "TB", "PB"];
-number = Math.floor(Math.log(bytes) / Math.log(1024));
-return (bytes / Math.pow(1024, Math.floor(number))).toFixed(precision) + " 
" + units[number];
   };
 });
 
@@ -1262,4 +1273,4 @@ angular.module('flinkApp').service('TaskManagersService', 
["$http", "flinkConfig
   return this;
 }]);
 
-//# 
sourceMappingURL=data:application/json;base64,eyJ2ZXJzaW9uIjozLCJzb3VyY2VzIjpbImluZGV4LmNvZmZlZSIsImluZGV4LmpzIiwiY29tbW9uL2RpcmVjdGl2ZXMuY29mZmVlIiwiY29tbW9uL2RpcmVjdGl2ZXMuanMiLCJjb21tb24vZmlsdGVycy5jb2ZmZWUiLCJjb21tb24vZmlsdGVycy5qcyIsImNvbW1vbi9zZXJ2aWNlcy5jb2ZmZWUiLCJjb21tb24vc2VydmljZXMuanMiLCJtb2R1bGVzL2pvYm1hbmFnZXIvam9ibWFuYWdlci5jdHJsLmNvZmZlZSIsIm1vZHVsZXMvam9ibWFuYWdlci9qb2JtYW5hZ2VyLmN0cmwuanMiLCJtb2R1bGVzL2pvYm1hbmFnZXIvam9ibWFuYWdlci5zdmMuY29mZmVlIiwibW9kdWxlcy9qb2JtYW5hZ2VyL2pvYm1hbmFnZXIuc3ZjLmpzIiwibW9kdWxlcy9qb2JzL2pvYnMuY3RybC5jb2ZmZWUiLCJtb2R1bGVzL2pvYnMvam9icy5jdHJsLmpzIiwibW9kdWxlcy9qb2JzL2pvYnMuZGlyLmNvZmZlZSIsIm1vZHVsZXMvam9icy9qb2JzLmRpci5qcyIsIm1vZHVsZXMvam9icy9qb2JzLnN2Yy5jb2ZmZWUiLCJtb2R1bGVzL2pvYnMvam9icy5zdmMuanMiLCJtb2R1bGVzL292ZXJ2aWV3L292ZXJ2aWV3LmN0cmwuY29mZmVlIiwibW9kdWxlcy9vdmVydmlldy9vdmVydmlldy5jdHJsLmpzIiwibW9kdWxlcy9vdmVydmlldy9vdmVydmlldy5zdmMuY29mZmVlIiwibW9kdWxlcy9vdmVydmlldy9vdmVydmlldy5zdmMuanMiLCJtb2R1bGVzL3Rhc2ttYW5hZ2VyL3Rhc2ttYW5
 
hZ2VyLmN0cmwuY29mZmVlIiwibW9kdWxlcy90YXNrbWFuYWdlci90YXNrbWFuYWdlci5jdHJsLmpzIiwibW9kdWxlcy90YXNrbWFuYWdlci90YXNrbWFuYWdlci5zdmMuY29mZmVlIiwibW9kdWxlcy90YXNrbWFuYWdlci90YXNrbWFuYWdlci5zdmMuanMiXSwibmFtZXMiOltdLCJtYXBwaW5ncyI6IkFBa0JBLFFBQVEsT0FBTyxZQUFZLENBQUMsYUFBYSxrQkFJeEMsbUJBQUksU0FBQyxZQUFEO0VBQ0gsV0FBVyxpQkFBaUI7RUNyQjVCLE9Ec0JBLFdBQVcsY0FBYyxXQUFBO0lBQ3ZCLFdBQVcsaUJBQWlCLENBQUMsV0FBVztJQ3JCeEMsT0RzQkEsV0FBVyxlQUFlOztJQUk3QixNQUFNLGVBQWU7RUFDcEIsb0JBQW9CO0dBS3JCLCtEQUFJLFNBQUMsYUFBYSxhQUFhLGFBQWEsV0FBeEM7RUMzQkgsT0Q0QkEsWUFBWSxhQUFhLEtBQUssU0FBQyxRQUFEO0lBQzVCLFFBQVEsT0FBTyxhQUFhO0lBRTVCLFlBQVk7SUM1QlosT0Q4QkEsVUFBVSxXQUFBO01DN0JSLE9EOEJBLFlBQVk7T0FDWixZQUFZOztJQUtqQixpQ0FBTyxTQUFDLHVCQUFEO0VDaENOLE9EaUNBLHNCQUFzQjtJQUl2QixnREFBTyxTQUFDLGdCQUFnQixvQkFBakI7RUFDTixlQUFlLE1BQU0sWUFDbkI7SUFBQSxLQUFLO0lBQ0wsT0FDRTtNQUFBLE1BQ0U7UUFBQSxhQUFhO1FBQ2IsWUFBWTs7O0tBRWpCLE1BQU0sZ0JBQ0w7SUFBQSxLQUFLO0lBQ0wsT0FDRTtNQUFBLE1BQ0U7UUFBQSxhQUFhO1FBQ2IsWUFBWTs7O0tBRWpCLE1BQU0sa0JBQ0w7SUFBQSxLQUFL
 
O0lBQ0wsT0FDRTtNQUFBLE1BQ0U7UUFBQSxhQUFhO1FBQ2IsWUFBWTs7O0tBRWpCLE1BQU0sY0FDTDtJQUFBLEtBQUs7SUFDTCxVQUFVO0lBQ1YsT0FDRTtNQUFBLE1BQ0U7UUFBQSxhQUFhO1FBQ2IsWUFBWTs7O0tBRWpCLE1BQU0sbUJBQ0w7SUFBQSxLQUFLO0lBQ0wsVUFBVTtJQUNWLE9BQ0U7TUFBQSxTQUNFO1FBQUEsYUFBYTtRQUNiLFlBQVk7OztLQUVqQixNQUFNLDRCQUNMO0lBQUEsS0FBSztJQUNMLE9BQ0U7TUFBQSxnQkFDRTtRQUFBLGFBQWE7UUFDYixZQUFZOzs7S0FFakIsTUFBTSxnQ0FDTDtJQUFBLEtBQUs7SUFDTCxPQUNFO01BQUEsZ0JBQ0U7UUFBQSxhQUFhO1FBQ2IsWUFBWTs7O0tBRWpCLE1BQU0sdUJBQ0w7SUFBQSxLQUFLO0lBQ0wsT0FDRTtNQUFBLFNBQ0U7UUFBQSxhQUFhOzs7S0FFbEIsTUFBTSw4QkFDTDtJQUFBLEtBQUs7SUFDTCxPQUNFO01BQUEsUUFDRTtRQUFBLGFBQWE7UUFDYixZQUFZOzs7S0FFakIsTUFBTSx5QkFDTDtJQUFBLEtBQUs7SUFDTCxPQUNFO01BQUEsU0FDRTtRQUFBLGFBQWE7UUFDYixZQUFZOzs7S0FFakIsTUFBTSx5QkFDTDtJQUFBLEtBQUs7SUFDTCxPQUNFO01BQUEsU0FDRTtRQUFBLGFBQWE7UUFDYixZQUFZOzs7S0FFakIsTUFBTSxxQkFDTDtJQUFBLEtBQUs7SUFDTCxPQUNFO01BQUEsU0FDRTtRQUFBLGFBQWE7OztLQUVsQixNQUFNLGVBQ0w7SUFBQSxLQUFLO0lBQ0wsT0FDRTtNQUFBLE1BQ0U7UUFBQSxhQUFhO1FBQ2IsWUFBWTs7O0tBRWpCLE1BQU0sa0JBQ
 

[6/7] flink git commit: [FLINK-2904] [web-dashboard] Fix truncation of task counts when the number is large.

2015-11-30 Thread sewen
[FLINK-2904] [web-dashboard] Fix truncation of task counts when the number is 
large.

This closes #1321


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b80ecfdc
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b80ecfdc
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b80ecfdc

Branch: refs/heads/master
Commit: b80ecfdc211e1e91d6ebc5b79e257901933a047d
Parents: cf91347
Author: Sachin Goel 
Authored: Tue Nov 3 19:07:34 2015 +0530
Committer: Stephan Ewen 
Committed: Mon Nov 30 17:44:12 2015 +0100

--
 flink-runtime-web/web-dashboard/app/styles/job.styl | 5 ++---
 flink-runtime-web/web-dashboard/web/css/index.css   | 5 ++---
 2 files changed, 4 insertions(+), 6 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/b80ecfdc/flink-runtime-web/web-dashboard/app/styles/job.styl
--
diff --git a/flink-runtime-web/web-dashboard/app/styles/job.styl 
b/flink-runtime-web/web-dashboard/app/styles/job.styl
index 1051712..fa21d0e 100644
--- a/flink-runtime-web/web-dashboard/app/styles/job.styl
+++ b/flink-runtime-web/web-dashboard/app/styles/job.styl
@@ -34,9 +34,8 @@
 .label-group
   .label
 display: inline-block
-width: 2em
-padding-left: 0.1em
-padding-right: 0.1em
+padding-left: 0.4em
+padding-right: 0.4em
 margin: 0
 border-right: 1px solid #ff
 border-radius(0)

http://git-wip-us.apache.org/repos/asf/flink/blob/b80ecfdc/flink-runtime-web/web-dashboard/web/css/index.css
--
diff --git a/flink-runtime-web/web-dashboard/web/css/index.css 
b/flink-runtime-web/web-dashboard/web/css/index.css
index 45599ca..5a7c44f 100644
--- a/flink-runtime-web/web-dashboard/web/css/index.css
+++ b/flink-runtime-web/web-dashboard/web/css/index.css
@@ -445,9 +445,8 @@ livechart {
 }
 .label-group .label {
   display: inline-block;
-  width: 2em;
-  padding-left: 0.1em;
-  padding-right: 0.1em;
+  padding-left: 0.4em;
+  padding-right: 0.4em;
   margin: 0;
   border-right: 1px solid #fff;
   -webkit-border-radius: 0;



[7/7] flink git commit: [FLINK-2950] [ml] [docs] Fix markdown rendering problem in SVM documentation

2015-11-30 Thread sewen
[FLINK-2950] [ml] [docs] Fix markdown rendering problem in SVM documentation

  - Remove unnecessary indentation of table
  - Fix wrong `strong` end tag
  - Simplify lambda expression in map operation

This closes #1312


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d5a6b13a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d5a6b13a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d5a6b13a

Branch: refs/heads/master
Commit: d5a6b13ab1ff6f42369b6b1cd2aad73bd6910362
Parents: 8051222
Author: Chiwan Park 
Authored: Sun Nov 1 11:15:26 2015 +0900
Committer: Stephan Ewen 
Committed: Mon Nov 30 17:44:13 2015 +0100

--
 docs/libs/ml/svm.md | 202 +++
 1 file changed, 101 insertions(+), 101 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/d5a6b13a/docs/libs/ml/svm.md
--
diff --git a/docs/libs/ml/svm.md b/docs/libs/ml/svm.md
index 89f7e70..c344979 100644
--- a/docs/libs/ml/svm.md
+++ b/docs/libs/ml/svm.md
@@ -87,106 +87,106 @@ the algorithm's performance.
 
 The SVM implementation can be controlled by the following parameters:
 
-   
-
-  
-Parameters
-Description
-  
-
-
-
-  
-Blocks
-
-  
-Sets the number of blocks into which the input data will be split.
-On each block the local stochastic dual coordinate ascent method 
is executed.
-This number should be set at least to the degree of parallelism.
-If no value is specified, then the parallelism of the input 
DataSet is used as the number of blocks.
-(Default value: None)
-  
-
-  
-  
-Iterations
-
-  
-Defines the maximum number of iterations of the outer loop method.
-In other words, it defines how often the SDCA method is applied to 
the blocked data.
-After each iteration, the locally computed weight vector updates 
have to be reduced to update the global weight vector value.
-The new weight vector is broadcast to all SDCA tasks at the 
beginning of each iteration.
-(Default value: 10)
-  
-
-  
-  
-LocalIterations
-
-  
-Defines the maximum number of SDCA iterations.
-In other words, it defines how many data points are drawn from 
each local data block to calculate the stochastic dual coordinate ascent.
-(Default value: 10)
-  
-
-  
-  
-Regularization
-
-  
-Defines the regularization constant of the SVM algorithm.
-The higher the value, the smaller will the 2-norm of the weight 
vector be.
-In case of a SVM with hinge loss this means that the SVM margin 
will be wider even though it might contain some false classifications.
-(Default value: 1.0)
-  
-
-  
-  
-Stepsize
-
-  
-Defines the initial step size for the updates of the weight vector.
-The larger the step size is, the larger will be the contribution 
of the weight vector updates to the next weight vector value.
-The effective scaling of the updates is $\frac{stepsize}{blocks}$.
-This value has to be tuned in case that the algorithm becomes 
unstable.
-(Default value: 1.0)
-  
-
-  
-  
-ThresholdValue
-
-  
-Defines the limiting value for the decision function above which 
examples are labeled as
-positive (+1.0). Examples with a decision function value below 
this value are classified
-as negative (-1.0). In order to get the raw decision function 
values you need to indicate it by
-using the OutputDecisionFunction parameter.  (Default value: 
0.0)
-  
-
-  
-  
-OutputDecisionFunction
-
-  
-Determines whether the predict and evaluate functions of the SVM 
should return the distance
-to the separating hyperplane, or binary class labels. Setting this 
to true will 
-return the raw distance to the hyperplane for each example. 
Setting it to false will 
-return the binary class label (+1.0, -1.0) (Default value: 
false<\strong>)
-  
-
-  
-  
-  Seed
-  
-
-  Defines the seed to initialize the random number generator.
-  The seed directly controls which data points are chosen for the SDCA 
method.
-  (Default value: Random Long Integer)
-
-   

[3/7] flink git commit: [FLINK-3056] [web-dashboard] Represent bytes in more readable form.

2015-11-30 Thread sewen
[FLINK-3056] [web-dashboard] Represent bytes in more readable form.

Bytes are now displayed in the following fashion:
  1. For [0, 1000) units, display three significant digits.
  2. For [1000,1024) units, display 2 decimal points for the next higher unit.
For example, 1010 KB is displayed as 0.99 MB, 10 MB is displayed as 10.0 MB and 
230 MB is displayed as such.

This closes #1419


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/80512229
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/80512229
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/80512229

Branch: refs/heads/master
Commit: 80512229c487537e59688d35ffdfb10dc18a8ac9
Parents: 6bac921
Author: Sachin Goel 
Authored: Sat Nov 28 16:54:23 2015 +0530
Committer: Stephan Ewen 
Committed: Mon Nov 30 17:44:12 2015 +0100

--
 .../jobs/job.plan.node-list.overview.jade   |  4 +--
 .../partials/jobs/job.plan.node.subtasks.jade   |  6 ++--
 .../app/partials/taskmanager/index.jade |  6 ++--
 .../taskmanager/taskmanager.metrics.jade| 33 +---
 .../app/scripts/common/filters.coffee   | 21 -
 flink-runtime-web/web-dashboard/web/js/index.js | 33 +---
 .../jobs/job.plan.node-list.overview.html   |  4 +--
 .../partials/jobs/job.plan.node.subtasks.html   |  4 +--
 .../web/partials/taskmanager/index.html |  6 ++--
 .../taskmanager/taskmanager.metrics.html| 24 +++---
 10 files changed, 85 insertions(+), 56 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/80512229/flink-runtime-web/web-dashboard/app/partials/jobs/job.plan.node-list.overview.jade
--
diff --git 
a/flink-runtime-web/web-dashboard/app/partials/jobs/job.plan.node-list.overview.jade
 
b/flink-runtime-web/web-dashboard/app/partials/jobs/job.plan.node-list.overview.jade
index 54bd29c..ef9257d 100644
--- 
a/flink-runtime-web/web-dashboard/app/partials/jobs/job.plan.node-list.overview.jade
+++ 
b/flink-runtime-web/web-dashboard/app/partials/jobs/job.plan.node-list.overview.jade
@@ -39,9 +39,9 @@ table.table.table-body-hover.table-clickable.table-activable
 span(ng-if="v.duration > -1" title="{{v.duration | 
humanizeDuration:false}}") {{v.duration | humanizeDuration:true}}
 
   td.td-long {{ v.name | humanizeText }}
-  td {{ v.metrics['read-bytes'] | number }}
+  td(title="{{v.metrics['read-bytes']}} bytes") {{ v.metrics['read-bytes'] 
| humanizeBytes }}
   td {{ v.metrics['read-records'] | number }}
-  td {{ v.metrics['write-bytes'] | number }}
+  td(title="{{v.metrics['write-bytes']}} bytes") {{ 
v.metrics['write-bytes'] | humanizeBytes }}
   td {{ v.metrics['write-records'] | number }}
   td
 .label-group

http://git-wip-us.apache.org/repos/asf/flink/blob/80512229/flink-runtime-web/web-dashboard/app/partials/jobs/job.plan.node.subtasks.jade
--
diff --git 
a/flink-runtime-web/web-dashboard/app/partials/jobs/job.plan.node.subtasks.jade 
b/flink-runtime-web/web-dashboard/app/partials/jobs/job.plan.node.subtasks.jade
index 6350325..259b364 100644
--- 
a/flink-runtime-web/web-dashboard/app/partials/jobs/job.plan.node.subtasks.jade
+++ 
b/flink-runtime-web/web-dashboard/app/partials/jobs/job.plan.node.subtasks.jade
@@ -43,11 +43,13 @@ 
table.table.table-hover.table-clickable.table-activable.table-inner(ng-if="subta
 span(ng-if="subtask.duration > -1" title="{{subtask.duration | 
humanizeDuration:false}}") {{subtask.duration | humanizeDuration:true}}
 
   td
-span(ng-if="subtask.metrics['read-bytes'] > -1") {{ 
subtask.metrics['read-bytes'] | number }}
+span(ng-if="subtask.metrics['read-bytes'] > -1" 
title="{{subtask.metrics['read-bytes']}} bytes")
+  | {{ subtask.metrics['read-bytes'] | humanizeBytes}}
   td
 span(ng-if="subtask.metrics['read-records'] > -1") {{ 
subtask.metrics['read-records'] | number }}
   td
-span(ng-if="subtask.metrics['write-bytes'] > -1") {{ 
subtask.metrics['write-bytes'] | number }}
+span(ng-if="subtask.metrics['write-bytes'] > -1" 
title="{{subtask.metrics['write-bytes']}} bytes")
+  | {{ subtask.metrics['write-bytes'] | humanizeBytes}}
   td
 span(ng-if="subtask.metrics['write-records'] > -1") {{ 
subtask.metrics['write-records'] | number }}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/80512229/flink-runtime-web/web-dashboard/app/partials/taskmanager/index.jade
--
diff --git 
a/flink-runtime-web/web-dashboard/app/partials/taskmanager/index.jade 

[4/7] flink git commit: [FLINK-2351] [core] Remove IOFormat ConfigBuilders

2015-11-30 Thread sewen
[FLINK-2351] [core] Remove IOFormat ConfigBuilders

This closes #1420


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6bac9214
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6bac9214
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6bac9214

Branch: refs/heads/master
Commit: 6bac921445d8fdecc4951bd4b9342bce0994c5ba
Parents: b80ecfd
Author: zentol 
Authored: Sat Nov 28 14:36:06 2015 +0100
Committer: Stephan Ewen 
Committed: Mon Nov 30 17:44:12 2015 +0100

--
 .../api/common/io/DelimitedInputFormat.java | 95 
 .../flink/api/common/io/FileInputFormat.java| 66 --
 .../flink/api/common/io/FileOutputFormat.java   | 50 ---
 .../operators/base/FileDataSourceBase.java  | 28 --
 4 files changed, 239 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/6bac9214/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java
--
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java
index 78c6705..cb32fc3 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java
@@ -24,7 +24,6 @@ import java.util.ArrayList;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.flink.api.common.io.statistics.BaseStatistics;
-import org.apache.flink.api.common.operators.base.FileDataSourceBase;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.GlobalConfiguration;
@@ -616,98 +615,4 @@ public abstract class DelimitedInputFormat extends 
FileInputFormat {
 * The configuration key to set the number of samples to take for the 
statistics.
 */
private static final String NUM_STATISTICS_SAMPLES = 
"delimited-format.numSamples";
-   
-   // --- Config Builder 
-
-   
-   /**
-* Creates a configuration builder that can be used to set the input 
format's parameters to the config in a fluent
-* fashion.
-* 
-* @return A config builder for setting parameters.
-*/
-   public static ConfigBuilder 
configureDelimitedFormat(FileDataSourceBase target) {
-   return new ConfigBuilder(target.getParameters());
-   }
-   
-   /**
-* Abstract builder used to set parameters to the input format's 
configuration in a fluent way.
-*/
-   protected static class AbstractConfigBuilder extends 
FileInputFormat.AbstractConfigBuilder {
-   
-   private static final String NEWLINE_DELIMITER = "\n";
-   
-   // 

-   
-   /**
-* Creates a new builder for the given configuration.
-* 
-* @param config The configuration into which the parameters 
will be written.
-*/
-   protected AbstractConfigBuilder(Configuration config) {
-   super(config);
-   }
-   
-   // 

-   
-   /**
-* Sets the delimiter to be a single character, namely the 
given one. The character must be within
-* the value range 0 to 127.
-* 
-* @param delimiter The delimiter character.
-* @return The builder itself.
-*/
-   public T recordDelimiter(char delimiter) {
-   if (delimiter == '\n') {
-   this.config.setString(RECORD_DELIMITER, 
NEWLINE_DELIMITER);
-   } else {
-   this.config.setString(RECORD_DELIMITER, 
String.valueOf(delimiter));
-   }
-   @SuppressWarnings("unchecked")
-   T ret = (T) this;
-   return ret;
-   }
-   
-   /**
-* Sets the delimiter to be the given string. The string will 
be converted to bytes for more efficient
-* comparison during input parsing. The conversion will be done 
using the platforms default charset.
-* 
-* @param delimiter The delimiter string.
-  

[5/7] flink git commit: [FLINK-3083] [docs] Add docs on how to configure streaming fault tolerance.

2015-11-30 Thread sewen
[FLINK-3083] [docs] Add docs on how to configure streaming fault tolerance.

This closes #1413


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/cf913476
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/cf913476
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/cf913476

Branch: refs/heads/master
Commit: cf913476965051d2ca38f3e95a84246bb7de712e
Parents: 2b358cd
Author: Stephan Ewen 
Authored: Thu Nov 26 16:45:45 2015 +0100
Committer: Stephan Ewen 
Committed: Mon Nov 30 17:44:12 2015 +0100

--
 docs/_includes/navbar.html   |   3 +-
 docs/apis/fault_tolerance.md | 265 ++
 docs/apis/streaming_guide.md | 131 +--
 3 files changed, 268 insertions(+), 131 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/cf913476/docs/_includes/navbar.html
--
diff --git a/docs/_includes/navbar.html b/docs/_includes/navbar.html
index 62bdce8..c565feb 100644
--- a/docs/_includes/navbar.html
+++ b/docs/_includes/navbar.html
@@ -81,8 +81,9 @@ under the License.
 Python API Beta
 
 
-Interactive Scala 
Shell
+Fault 
Tolerance
 State in 
Streaming Programs
+Interactive Scala 
Shell
 DataSet 
Transformations
 Best 
Practices
 Connectors 
(DataSet API)

http://git-wip-us.apache.org/repos/asf/flink/blob/cf913476/docs/apis/fault_tolerance.md
--
diff --git a/docs/apis/fault_tolerance.md b/docs/apis/fault_tolerance.md
new file mode 100644
index 000..677ff95
--- /dev/null
+++ b/docs/apis/fault_tolerance.md
@@ -0,0 +1,265 @@
+---
+title: "Fault Tolerance"
+is_beta: false
+---
+
+
+
+
+Flink's fault tolerance mechanism recovers programs in the presence of 
failures and
+continues to execute them. Such failures include machine hardware failures, 
network failures,
+transient program failures, etc.
+
+* This will be replaced by the TOC
+{:toc}
+
+
+Streaming Fault Tolerance (DataStream API)
+--
+
+Flink has a checkpointing mechanism that recovers streaming jobs after 
failues. The checkpointing mechanism requires a *persistent* (or *durable*) 
source that
+can be asked for prior records again (Apache Kafka is a good example of such a 
source).
+
+The checkpointing mechanism stores the progress in the data sources and data 
sinks, the state of windows, as well as the user-defined state (see [Working 
with State]({{ site.baseurl }}/apis/streaming_guide.html#working-with-state)) 
consistently to provide *exactly once* processing semantics. Where the 
checkpoints are stored (e.g., JobManager memory, file system, database) depends 
on the configured [state backend]({{ site.baseurl }}/apis/state_backends.html).
+
+The [docs on streaming fault tolerance]({{ site.baseurl 
}}/internals/stream_checkpointing.html) describe in detail the technique behind 
Flink's streaming fault tolerance mechanism.
+
+To enable checkpointing, call `enableCheckpointing(n)` on the 
`StreamExecutionEnvironment`, where *n* is the checkpoint interval in 
milliseconds.
+
+Other parameters for checkpointing include:
+
+- *Number of retries*: The `setNumberOfExecutionRerties()` method defines how 
many times the job is restarted after a failure.
+  When checkpointing is activated, but this value is not explicitly set, the 
job is restarted infinitely often.
+
+- *exactly-once vs. at-least-once*: You can optionally pass a mode to the 
`enableCheckpointing(n)` method to choose between the two guarantee levels.
+  Exactly-once is preferrable for most applications. At-least-once may be 
relevant for certain super-low-latency (consistently few milliseconds) 
applications.
+
+- *number of concurrent checkpoints*: By default, the system will not trigger 
another checkpoint while one is still in progress. This ensures that the 
topology does not spend too much time on checkpoints and not make progress with 
processing the streams. It is possible to allow for multiple overlapping 
checkpoints, which is interesting for pipelines that have a certain processing 
delay (for example because the functions call external services that need some 
time to respond) but that still want to do very frequent checkpoints (100s of 
milliseconds) to re-process very little upon failures.
+
+- *checkpoint timeout*: The time after which a checkpoint-in-progress is 
aborted, if it did not complete until then.
+
+
+
+{% highlight java %}
+StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+// start a checkpoint every 1000 ms

[1/7] flink git commit: [FLINK-3056] [web-dashboard] Represent bytes in more readable form.

2015-11-30 Thread sewen
Repository: flink
Updated Branches:
  refs/heads/master 2b358cde0 -> d5a6b13ab


http://git-wip-us.apache.org/repos/asf/flink/blob/80512229/flink-runtime-web/web-dashboard/web/partials/jobs/job.plan.node-list.overview.html
--
diff --git 
a/flink-runtime-web/web-dashboard/web/partials/jobs/job.plan.node-list.overview.html
 
b/flink-runtime-web/web-dashboard/web/partials/jobs/job.plan.node-list.overview.html
index daba67d..4d05cc3 100644
--- 
a/flink-runtime-web/web-dashboard/web/partials/jobs/job.plan.node-list.overview.html
+++ 
b/flink-runtime-web/web-dashboard/web/partials/jobs/job.plan.node-list.overview.html
@@ -38,9 +38,9 @@ limitations under the License.
   {{ v['end-time'] | 
amDateFormat:'-MM-DD, H:mm:ss' }}
   {{v.duration | humanizeDuration:true}}
   {{ v.name | humanizeText }}
-  {{ v.metrics['read-bytes'] | number }}
+  {{ v.metrics['read-bytes'] 
| humanizeBytes }}
   {{ v.metrics['read-records'] | number }}
-  {{ v.metrics['write-bytes'] | number }}
+  {{ 
v.metrics['write-bytes'] | humanizeBytes }}
   {{ v.metrics['write-records'] | number }}
   
 

http://git-wip-us.apache.org/repos/asf/flink/blob/80512229/flink-runtime-web/web-dashboard/web/partials/jobs/job.plan.node.subtasks.html
--
diff --git 
a/flink-runtime-web/web-dashboard/web/partials/jobs/job.plan.node.subtasks.html 
b/flink-runtime-web/web-dashboard/web/partials/jobs/job.plan.node.subtasks.html
index 147b14d..874ed88 100644
--- 
a/flink-runtime-web/web-dashboard/web/partials/jobs/job.plan.node.subtasks.html
+++ 
b/flink-runtime-web/web-dashboard/web/partials/jobs/job.plan.node.subtasks.html
@@ -38,9 +38,9 @@ limitations under the License.
   {{ subtask['start-time'] 
| amDateFormat:'-MM-DD, H:mm:ss' }}
   {{ subtask['end-time'] | 
amDateFormat:'-MM-DD, H:mm:ss' }}
   {{subtask.duration | 
humanizeDuration:true}}
-  {{ 
subtask.metrics['read-bytes'] | number }}
+  {{ 
subtask.metrics['read-bytes'] | humanizeBytes}}
   {{ 
subtask.metrics['read-records'] | number }}
-  {{ 
subtask.metrics['write-bytes'] | number }}
+  {{ 
subtask.metrics['write-bytes'] | humanizeBytes}}
   {{ 
subtask.metrics['write-records'] | number }}
   {{ subtask.attempt + 1 }}
   {{ subtask.host }}

http://git-wip-us.apache.org/repos/asf/flink/blob/80512229/flink-runtime-web/web-dashboard/web/partials/taskmanager/index.html
--
diff --git 
a/flink-runtime-web/web-dashboard/web/partials/taskmanager/index.html 
b/flink-runtime-web/web-dashboard/web/partials/taskmanager/index.html
index bf37409..f56dcbe 100644
--- a/flink-runtime-web/web-dashboard/web/partials/taskmanager/index.html
+++ b/flink-runtime-web/web-dashboard/web/partials/taskmanager/index.html
@@ -48,9 +48,9 @@ limitations under the License.
 {{ manager.slotsNumber }}
 {{ manager.freeSlots }}
 {{ manager.cpuCores }}
-{{ manager.physicalMemory | bytes:MB }}
-{{ manager.freeMemory | bytes:MB }}
-{{ manager.managedMemory | bytes:MB }}
+{{ manager.physicalMemory 
| humanizeBytes }}
+{{ manager.freeMemory | 
humanizeBytes }}
+{{ manager.managedMemory | 
humanizeBytes }}
   
 
   

http://git-wip-us.apache.org/repos/asf/flink/blob/80512229/flink-runtime-web/web-dashboard/web/partials/taskmanager/taskmanager.metrics.html
--
diff --git 
a/flink-runtime-web/web-dashboard/web/partials/taskmanager/taskmanager.metrics.html
 
b/flink-runtime-web/web-dashboard/web/partials/taskmanager/taskmanager.metrics.html
index 2a8b51e..691490e 100644
--- 
a/flink-runtime-web/web-dashboard/web/partials/taskmanager/taskmanager.metrics.html
+++ 
b/flink-runtime-web/web-dashboard/web/partials/taskmanager/taskmanager.metrics.html
@@ -30,21 +30,21 @@ limitations under the License.
   
 
   Heap
-  
-  
-  
+  {{metrics.metrics.gauges['memory.heap.committed'].value | 
humanizeBytes}}
+  {{metrics.metrics.gauges['memory.heap.init'].value | humanizeBytes}}
+  {{metrics.metrics.gauges['memory.heap.max'].value | humanizeBytes}}
 
 
   Non-Heap
-  
-  
-  
+  {{metrics.metrics.gauges['memory.non-heap.committed'].value | 
humanizeBytes}}
+  {{metrics.metrics.gauges['memory.non-heap.init'].value | 
humanizeBytes}}
+  {{metrics.metrics.gauges['memory.non-heap.max'].value | 
humanizeBytes}}
 
 
   Total
-  
-  
-  
+  {{metrics.metrics.gauges['memory.total.committed'].value | 
humanizeBytes}}
+  {{metrics.metrics.gauges['memory.total.init'].value | 
humanizeBytes}}
+  {{metrics.metrics.gauges['memory.total.max'].value | humanizeBytes}}
 
   
 
@@ -66,9 +66,9 @@ limitations under the