[FLINK-3931] Implement Transport Encryption (SSL/TLS)

Enabled SSL support for the different network transport within flink.
This feature is documented as task T3 in the "Secure Data Access in Flink" 
design doc - 
https://docs.google.com/document/d/1-GQB6uVOyoaXGwtqwqLV8BHDxWiMO2WnVzBoJ8oPaAs

This commit addresses subtasks - FLINK-4324, FLINK-4325, FLINK-4404 and 
FLINK-4405
For details about how to use this feature, refer to security-ssl.md and the new 
config values added to config.md

This closes #2518


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/069de27d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/069de27d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/069de27d

Branch: refs/heads/master
Commit: 069de27df67faecad741d8ba95ca0ada609e930c
Parents: dffde7e
Author: Suri <suresh.krishna...@gmail.com>
Authored: Sun Oct 16 08:29:15 2016 -0700
Committer: Maximilian Michels <m...@apache.org>
Committed: Mon Oct 17 11:29:04 2016 +0200

----------------------------------------------------------------------
 docs/setup/config.md                            |  17 +
 docs/setup/security-ssl.md                      | 139 +++++++
 .../org/apache/flink/client/CliFrontend.java    |   2 +-
 .../flink/client/program/ClusterClient.java     |   4 +-
 .../org/apache/flink/storm/api/FlinkClient.java |   3 +-
 .../flink/configuration/ConfigConstants.java    |  81 ++++-
 .../webmonitor/RuntimeMonitorHandler.java       |   8 +-
 .../webmonitor/RuntimeMonitorHandlerBase.java   |  10 +-
 .../runtime/webmonitor/WebRuntimeMonitor.java   |  51 ++-
 .../files/StaticFileServerHandler.java          |  27 +-
 .../handlers/HandlerRedirectUtils.java          |   5 +-
 .../webmonitor/handlers/JarRunHandler.java      |   9 +-
 .../handlers/JobManagerConfigHandler.java       |  13 +-
 .../handlers/TaskManagerLogHandler.java         |  38 +-
 .../BackPressureStatsTrackerITCase.java         |   3 +-
 .../StackTraceSampleCoordinatorITCase.java      |   3 +-
 .../web-dashboard/app/partials/submit.jade      |   2 +-
 .../web-dashboard/web/partials/submit.html      |   4 +-
 .../apache/flink/runtime/blob/BlobCache.java    |  17 +-
 .../apache/flink/runtime/blob/BlobClient.java   |  63 +++-
 .../apache/flink/runtime/blob/BlobServer.java   |  29 +-
 .../apache/flink/runtime/client/JobClient.java  |  13 +-
 .../client/JobSubmissionClientActor.java        |  15 +-
 .../runtime/io/network/netty/NettyClient.java   |  57 ++-
 .../runtime/io/network/netty/NettyConfig.java   |  42 ++-
 .../runtime/io/network/netty/NettyServer.java   |  18 +
 .../apache/flink/runtime/jobgraph/JobGraph.java |  13 +-
 .../org/apache/flink/runtime/net/SSLUtils.java  | 185 ++++++++++
 .../flink/runtime/util/StandaloneUtils.java     |  10 +-
 .../apache/flink/runtime/akka/AkkaUtils.scala   |  87 ++++-
 .../flink/runtime/jobmanager/JobManager.scala   |  22 +-
 .../runtime/minicluster/FlinkMiniCluster.scala  |   2 +
 .../flink/runtime/taskmanager/TaskManager.scala |  10 +-
 .../runtime/blob/BlobCacheRetriesTest.java      |   4 +-
 .../runtime/blob/BlobCacheSuccessTest.java      |   5 +-
 .../flink/runtime/blob/BlobClientSslTest.java   | 360 +++++++++++++++++++
 .../flink/runtime/blob/BlobClientTest.java      |  21 +-
 .../flink/runtime/blob/BlobRecoveryITCase.java  |   4 +-
 .../runtime/blob/BlobServerDeleteTest.java      |  16 +-
 .../flink/runtime/blob/BlobServerGetTest.java   |   4 +-
 .../flink/runtime/blob/BlobServerPutTest.java   |  12 +-
 .../runtime/client/JobClientActorTest.java      |  14 +-
 .../BlobLibraryCacheManagerTest.java            |   7 +-
 .../BlobLibraryCacheRecoveryITCase.java         |   4 +-
 .../network/netty/NettyClientServerSslTest.java | 160 +++++++++
 .../JobManagerProcessReapingTest.java           |   1 +
 .../flink/runtime/jobmanager/JobSubmitTest.java |  15 +-
 .../ZooKeeperLeaderRetrievalTest.java           |   7 +-
 .../apache/flink/runtime/net/SSLUtilsTest.java  | 128 +++++++
 .../runtime/testutils/JobManagerProcess.java    |   1 +
 .../src/test/resources/local127.keystore        | Bin 0 -> 2130 bytes
 .../src/test/resources/local127.truststore      | Bin 0 -> 840 bytes
 .../src/test/resources/untrusted.keystore       | Bin 0 -> 2162 bytes
 .../flink/runtime/akka/AkkaSslITCase.scala      | 129 +++++++
 .../flink/runtime/akka/AkkaUtilsTest.scala      |  22 ++
 .../runtime/testingUtils/TestingUtils.scala     |   4 +-
 .../test/classloading/ClassLoaderITCase.java    |   3 +-
 .../ZooKeeperLeaderElectionITCase.java          |   1 +
 .../flink/yarn/YarnApplicationMasterRunner.java |   9 +-
 59 files changed, 1778 insertions(+), 155 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/069de27d/docs/setup/config.md
----------------------------------------------------------------------
diff --git a/docs/setup/config.md b/docs/setup/config.md
index 0c8f451..9567eb1 100644
--- a/docs/setup/config.md
+++ b/docs/setup/config.md
@@ -157,6 +157,8 @@ will be used under the directory specified by 
jobmanager.web.tmpdir.
 
 - `blob.server.port`: Port definition for the blob server (serving user jar's) 
on the Taskmanagers. By default the port is set to 0, which means that the 
operating system is picking an ephemeral port. Flink also accepts a list of 
ports ("50100,50101"), ranges ("50100-50200") or a combination of both. It is 
recommended to set a range of ports to avoid collisions when multiple 
JobManagers are running on the same machine.
 
+- `blob.service.ssl.enabled`: Flag to enable ssl for the blob client/server 
communication. This is applicable only when the global ssl flag 
security.ssl.enabled is set to true (DEFAULT: true).
+
 - `restart-strategy`: Default restart strategy to use in case that no restart 
strategy has been specified for the submitted job.
 Currently, it can be chosen from fixed delay restart strategy, failure rate 
restart strategy or no restart strategy.
 To use the fixed delay strategy you have to specify "fixed-delay".
@@ -198,6 +200,7 @@ The following parameters configure Flink's JobManager and 
TaskManagers.
 - `taskmanager.hostname`: The hostname of the network interface that the 
TaskManager binds to. By default, the TaskManager searches for network 
interfaces that can connect to the JobManager and other TaskManagers. This 
option can be used to define a hostname if that strategy fails for some reason. 
Because different TaskManagers need different values for this option, it 
usually is specified in an additional non-shared TaskManager-specific config 
file.
 - `taskmanager.rpc.port`: The task manager's IPC port (DEFAULT: **0**, which 
lets the OS choose a free port).
 - `taskmanager.data.port`: The task manager's port used for data exchange 
operations (DEFAULT: **0**, which lets the OS choose a free port).
+- `taskmanager.data.ssl.enabled`: Enable SSL support for the taskmanager data 
transport. This is applicable only when the global ssl flag 
security.ssl.enabled is set to true (DEFAULT: **true**)
 - `jobmanager.heap.mb`: JVM heap size (in megabytes) for the JobManager 
(DEFAULT: **256**).
 - `taskmanager.heap.mb`: JVM heap size (in megabytes) for the TaskManagers, 
which are the parallel workers of the system. In contrast to Hadoop, Flink runs 
operators (e.g., join, aggregate) and user-defined functions (e.g., Map, 
Reduce, CoGroup) inside the TaskManager (including sorting/hashing/caching), so 
this value should be as large as possible (DEFAULT: **512**). On YARN setups, 
this value is automatically configured to the size of the TaskManager's YARN 
container, minus a certain tolerance value.
 - `taskmanager.numberOfTaskSlots`: The number of parallel operator or user 
function instances that a single TaskManager can run (DEFAULT: **1**). If this 
value is larger than 1, a single TaskManager takes multiple instances of a 
function or operator. That way, the TaskManager can utilize multiple CPU cores, 
but at the same time, the available memory is divided between the different 
operator or function instances. This value is typically proportional to the 
number of physical CPU cores that the TaskManager's machine has (e.g., equal to 
the number of cores, or half the number of cores).
@@ -228,6 +231,19 @@ The following parameters configure Flink's JobManager and 
TaskManagers.
 - `akka.throughput`: Number of messages that are processed in a batch before 
returning the thread to the pool. Low values denote a fair scheduling whereas 
high values can increase the performance at the cost of unfairness (DEFAULT: 
**15**).
 - `akka.log.lifecycle.events`: Turns on the Akka's remote logging of events. 
Set this value to 'true' in case of debugging (DEFAULT: **false**).
 - `akka.startup-timeout`: Timeout after which the startup of a remote 
component is considered being failed (DEFAULT: **akka.ask.timeout**).
+- `akka.ssl.enabled`: Turns on SSL for Akka's remote communication. This is 
applicable only when the global ssl flag security.ssl.enabled is set to true 
(DEFAULT: **true**).
+
+### SSL Settings
+
+- `security.ssl.enabled`: Turns on SSL for internal network communication. 
This can be optionally overridden by flags defined in different transport 
modules (DEFAULT: **false**).
+- `security.ssl.keystore`: The Java keystore file to be used by the flink 
endpoint for its SSL Key and Certificate.
+- `security.ssl.keystore-password`: The secret to decrypt the keystore file.
+- `security.ssl.key-password`: The secret to decrypt the server key in the 
keystore.
+- `security.ssl.truststore`: The truststore file containing the public CA 
certificates to be used by flink endpoints to verify the peer's certificate.
+- `security.ssl.truststore-password`: The secret to decrypt the truststore.
+- `security.ssl.protocol`: The SSL protocol version to be supported for the 
ssl transport (DEFAULT: **TLSv1.2**).
+- `security.ssl.algorithms`: The comma separated list of standard SSL 
algorithms to be supported. Read more 
[here](http://docs.oracle.com/javase/8/docs/technotes/guides/security/StandardNames.html#ciphersuites)
 (DEFAULT: **TLS_RSA_WITH_AES_128_CBC_SHA**).
+- `security.ssl.verify-hostname`: Flag to enable peer's hostname verification 
during ssl handshake (DEFAULT: **true**).
 
 ### JobManager Web Frontend
 
@@ -239,6 +255,7 @@ The following parameters configure Flink's JobManager and 
TaskManagers.
 - `jobmanager.web.backpressure.refresh-interval`: Time after which available 
stats are deprecated and need to be refreshed (DEFAULT: `60000`, 1 min).
 - `jobmanager.web.backpressure.num-samples`: Number of stack trace samples to 
take to determine back pressure (DEFAULT: `100`).
 - `jobmanager.web.backpressure.delay-between-samples`: Delay between stack 
trace samples to determine back pressure (DEFAULT: `50`, 50 ms).
+- `jobmanager.web.ssl.enabled`: Enable https access to the web frontend. This 
is applicable only when the global ssl flag security.ssl.enabled is set to true 
(DEFAULT: `true`).
 
 ### File Systems
 

http://git-wip-us.apache.org/repos/asf/flink/blob/069de27d/docs/setup/security-ssl.md
----------------------------------------------------------------------
diff --git a/docs/setup/security-ssl.md b/docs/setup/security-ssl.md
new file mode 100644
index 0000000..aa99a0a
--- /dev/null
+++ b/docs/setup/security-ssl.md
@@ -0,0 +1,139 @@
+---
+title: "SSL Setup"
+nav-parent_id: setup
+nav-pos: 8
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+This page provides instructions on how to enable SSL for the network 
communication between different flink components.
+
+## SSL Configuration
+
+SSL can be enabled for all network communication between flink components. SSL 
keystores and truststore has to be deployed on each flink node and configured 
(conf/flink-conf.yaml) using keys in the security.ssl.* namespace (Please see 
the [configuration page](config.html) for details). SSL can be selectively 
enabled/disabled for different transports using the following flags. These 
flags are only applicable when security.ssl.enabled is set to true.
+
+* **taskmanager.data.ssl.enabled**: SSL flag for data communication between 
task managers
+* **blob.service.ssl.enabled**: SSL flag for blob service client/server 
communication
+* **akka.ssl.enabled**: SSL flag for the akka based control connection between 
the flink client, jobmanager and taskmanager 
+* **jobmanager.web.ssl.enabled**: Flag to enable https access to the 
jobmanager's web frontend
+
+## Deploying Keystores and Truststores
+
+You need to have a Java Keystore generated and copied to each node in the 
flink cluster. The common name or subject alternative names in the certificate 
should match the node's hostname and IP address. Keystores and truststores can 
be generated using the keytool utility 
(https://docs.oracle.com/javase/8/docs/technotes/tools/unix/keytool.html). All 
flink components should have read access to the keystore and truststore files.
+
+### Example: Creating self signed CA and keystores for a 2 node cluster
+
+Execute the following keytool commands to create a truststore with a self 
signed CA
+~~~
+keytool -genkeypair -alias ca -keystore ca.keystore -dname "CN=Sample CA" 
-storepass password -keypass password -keyalg RSA -ext bc=ca:true
+keytool -keystore ca.keystore -storepass password -alias ca -exportcert > 
ca.cer
+keytool -importcert -keystore ca.truststore -alias ca -storepass password 
-noprompt -file ca.cer
+~~~
+
+Now create keystores for each node with certificates signed by the above CA. 
Let node1.company.org and node2.company.org be the hostnames with IPs 
192.168.1.1 and 192.168.1.2 respectively
+
+**Node 1**
+~~~
+keytool -genkeypair -alias node1 -keystore node1.keystore -dname 
"CN=node1.company.org" -ext SAN=dns:node1.company.org,ip:192.168.1.1 -storepass 
password -keypass password -keyalg RSA
+keytool -certreq -keystore node1.keystore -storepass password -alias node1 
-file node1.csr
+keytool -gencert -keystore ca.keystore -storepass password -alias ca -ext 
SAN=dns:node1.company.org,ip:192.168.1.1 -infile node1.csr -outfile node1.cer
+keytool -importcert -keystore node1.keystore -storepass password -file ca.cer 
-alias ca -noprompt
+keytool -importcert -keystore node1.keystore -storepass password -file 
node1.cer -alias node1 -noprompt
+~~~
+
+**Node 2**
+~~~
+keytool -genkeypair -alias node2 -keystore node2.keystore -dname 
"CN=node2.company.org" -ext SAN=dns:node2.company.org,ip:192.168.1.2 -storepass 
password -keypass password -keyalg RSA
+keytool -certreq -keystore node2.keystore -storepass password -alias node2 
-file node2.csr
+keytool -gencert -keystore ca.keystore -storepass password -alias ca -ext 
SAN=dns:node2.company.org,ip:192.168.1.2 -infile node2.csr -outfile node2.cer
+keytool -importcert -keystore node2.keystore -storepass password -file ca.cer 
-alias ca -noprompt
+keytool -importcert -keystore node2.keystore -storepass password -file 
node2.cer -alias node2 -noprompt
+~~~
+
+## Standalone Deployment
+Configure each node in the standalone cluster to pick up the keystore and 
truststore files present in the local file system.
+
+### Example: 2 node cluster
+* Generate 2 keystores, one for each node, and copy them to the filesystem on 
the respective node. Also copy the pulic key of the CA (which was used to sign 
the certificates in the keystore) as a Java truststore on both the nodes
+* Configure conf/flink-conf.yaml to pick up these files
+
+#### Node 1
+~~~
+security.ssl.enabled: true
+security.ssl.keystore: /usr/local/node1.keystore
+security.ssl.keystore-password: abc123
+security.ssl.key-password: abc123
+security.ssl.truststore: /usr/local/ca.truststore
+security.ssl.truststore-password: abc123
+~~~
+
+#### Node 2
+~~~
+security.ssl.enabled: true
+security.ssl.keystore: /usr/local/node2.keystore
+security.ssl.keystore-password: abc123
+security.ssl.key-password: abc123
+security.ssl.truststore: /usr/local/ca.truststore
+security.ssl.truststore-password: abc123
+~~~
+
+* Restart the flink components to enable SSL for all of flink's internal 
communication
+* Verify by accessing the jobmanager's UI using https url. The task manager's 
path in the UI should show akka.ssl.tcp:// as the protocol
+* The blob server and task manager's data communication can be verified from 
the log files
+
+## YARN Deployment
+The keystores and truststore can be deployed in a YARN setup in multiple ways 
depending on the cluster setup. Following are 2 ways to achieve this
+
+### 1. Deploy keystores before starting the YARN session
+The keystores and truststore should be generated and deployed on all nodes in 
the YARN setup where flink components can potentially be executed. The same 
flink config file from the flink YARN client is used for all the flink 
components running in the YARN cluster. Therefore we need to ensure the 
keystore is deployed and accessible using the same filepath in all the YARN 
nodes.
+
+#### Example config
+~~~
+security.ssl.enabled: true
+security.ssl.keystore: /usr/local/node.keystore
+security.ssl.keystore-password: abc123
+security.ssl.key-password: abc123
+security.ssl.truststore: /usr/local/ca.truststore
+security.ssl.truststore-password: abc123
+~~~
+
+Now you can start the YARN session from the CLI like you would normally do.
+
+### 2. Use YARN cli to deploy the keystores and truststore
+We can use the YARN client's ship files option (-yt) to distribute the 
keystores and truststore. Since the same keystore will be deployed at all 
nodes, we need to ensure a single certificate in the keystore can be served for 
all nodes. This can be done by either using the Subject Alternative Name(SAN) 
extension in the certificate and setting it to cover all nodes (hostname and ip 
addresses) in the cluster or by using wildcard subdomain names (if the cluster 
is setup accordingly). 
+
+**Example**
+* Supply the following parameters to the keytool command when generating the 
keystore: -ext 
SAN=dns:node1.company.org,ip:192.168.1.1,dns:node2.company.org,ip:192.168.1.2
+* Copy the keystore and the CA's truststore into a local directory (at the 
cli's working directory), say deploy-keys/
+* Update the configuration to pick up the files from a relative path
+~~~
+security.ssl.enabled: true
+security.ssl.keystore: deploy-keys/node.keystore
+security.ssl.keystore-password: password
+security.ssl.key-password: password
+security.ssl.truststore: deploy-keys/ca.truststore
+security.ssl.truststore-password: password
+~~~
+* Start the YARN session using the -yt parameter
+~~~
+flink run -m yarn-cluster -yt deploy-keys/ TestJob.jar
+~~~
+
+When deployed using YARN, flink's web dashboard is accessible through YARN 
proxy's Tracking URL. To ensure that the YARN proxy is able to access flink's 
https url you need to configure YARN proxy to accept flink's SSL certificates. 
Add the custom CA certificate into Java's default trustore on the YARN Proxy 
node.
+

http://git-wip-us.apache.org/repos/asf/flink/blob/069de27d/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
----------------------------------------------------------------------
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java 
b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
index 0572dc6..8b445e6 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
@@ -768,7 +768,7 @@ public class CliFrontend {
 
                                                logAndSysout("Uploading JAR 
files.");
                                                LOG.debug("JAR files: " + 
libPaths);
-                                               blobKeys = 
BlobClient.uploadJarFiles(jobManager, clientTimeout, libPaths);
+                                               blobKeys = 
BlobClient.uploadJarFiles(jobManager, clientTimeout, config, libPaths);
                                                LOG.debug("Blob keys: " + 
blobKeys.toString());
                                        }
                                } finally {

http://git-wip-us.apache.org/repos/asf/flink/blob/069de27d/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
----------------------------------------------------------------------
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
 
b/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
index 5be6650..25bcadc 100644
--- 
a/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
@@ -404,7 +404,7 @@ public abstract class ClusterClient {
 
                try {
                        logAndSysout("Submitting job with JobID: " + 
jobGraph.getJobID() + ". Waiting for job completion.");
-                       this.lastJobExecutionResult = 
JobClient.submitJobAndWait(actorSystemLoader.get(),
+                       this.lastJobExecutionResult = 
JobClient.submitJobAndWait(actorSystemLoader.get(), flinkConfig,
                                leaderRetrievalService, jobGraph, timeout, 
printStatusDuringExecution, classLoader);
                        return this.lastJobExecutionResult;
                } catch (JobExecutionException e) {
@@ -432,7 +432,7 @@ public abstract class ClusterClient {
 
                try {
                        logAndSysout("Submitting Job with JobID: " + 
jobGraph.getJobID() + ". Returning after job submission.");
-                       JobClient.submitJobDetached(jobManagerGateway, 
jobGraph, timeout, classLoader);
+                       JobClient.submitJobDetached(jobManagerGateway, 
flinkConfig, jobGraph, timeout, classLoader);
                        return new JobSubmissionResult(jobGraph.getJobID());
                } catch (JobExecutionException e) {
                        throw new ProgramInvocationException("The program 
execution failed: " + e.getMessage(), e);

http://git-wip-us.apache.org/repos/asf/flink/blob/069de27d/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkClient.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkClient.java
 
b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkClient.java
index f4bcfb7..9f47d60 100644
--- 
a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkClient.java
+++ 
b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkClient.java
@@ -332,7 +332,8 @@ public class FlinkClient {
                        throw new RuntimeException("Could not start actor 
system to communicate with JobManager", e);
                }
 
-               return JobManager.getJobManagerActorRef(new 
InetSocketAddress(this.jobManagerHost, this.jobManagerPort),
+               return 
JobManager.getJobManagerActorRef(AkkaUtils.getAkkaProtocol(configuration),
+                               new InetSocketAddress(this.jobManagerHost, 
this.jobManagerPort),
                                actorSystem, 
AkkaUtils.getLookupTimeout(configuration));
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/069de27d/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java 
b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
index e608eb3..9061e87 100644
--- 
a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
+++ 
b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
@@ -157,6 +157,9 @@ public final class ConfigConstants {
         */
        public static final String BLOB_SERVER_PORT = "blob.server.port";
 
+       /** Flag to override ssl support for the blob service transport */
+       public static final String BLOB_SERVICE_SSL_ENABLED = 
"blob.service.ssl.enabled";
+
        /**
         * The config parameter defining the cleanup interval of the library 
cache manager.
         */
@@ -178,6 +181,11 @@ public final class ConfigConstants {
        public static final String TASK_MANAGER_DATA_PORT_KEY = 
"taskmanager.data.port";
 
        /**
+        * Config parameter to override SSL support for taskmanager's data 
transport
+        */
+       public static final String TASK_MANAGER_DATA_SSL_ENABLED = 
"taskmanager.data.ssl.enabled";
+
+       /**
         * The config parameter defining the directories for temporary files.
         */
        public static final String TASK_MANAGER_TMP_DIR_KEY = 
"taskmanager.tmp.dirs";
@@ -544,6 +552,11 @@ public final class ConfigConstants {
        public static final String JOB_MANAGER_WEB_PORT_KEY = 
"jobmanager.web.port";
 
        /**
+        * Config parameter to override SSL support for the JobManager Web UI
+        */
+       public static final String JOB_MANAGER_WEB_SSL_ENABLED = 
"jobmanager.web.ssl.enabled";
+
+       /**
         * The config parameter defining the flink web directory to be used by 
the webmonitor.
         */
        public static final String JOB_MANAGER_WEB_TMPDIR_KEY = 
"jobmanager.web.tmpdir";
@@ -628,6 +641,11 @@ public final class ConfigConstants {
        public static final String AKKA_TCP_TIMEOUT = "akka.tcp.timeout";
 
        /**
+        * Override SSL support for the Akka transport
+        */
+       public static final String AKKA_SSL_ENABLED = "akka.ssl.enabled";
+
+       /**
         * Maximum framesize of akka messages
         */
        public static final String AKKA_FRAMESIZE = "akka.framesize";
@@ -662,6 +680,40 @@ public final class ConfigConstants {
         */
        public static final String AKKA_JVM_EXIT_ON_FATAL_ERROR = 
"akka.jvm-exit-on-fatal-error";
        
+       // ----------------------------- Transport SSL 
Settings--------------------
+
+       /**
+        * Enable SSL support
+        */
+       public static final String SECURITY_SSL_ENABLED = 
"security.ssl.enabled";
+
+       /** The Java keystore file containing the flink endpoint key and 
certificate */
+       public static final String SECURITY_SSL_KEYSTORE = 
"security.ssl.keystore";
+
+       /** secret to decrypt the keystore file */
+       public static final String SECURITY_SSL_KEYSTORE_PASSWORD = 
"security.ssl.keystore-password";
+
+       /** secret to decrypt the server key */
+       public static final String SECURITY_SSL_KEY_PASSWORD = 
"security.ssl.key-password";
+
+       /** The truststore file containing the public CA certificates to verify 
the ssl peers */
+       public static final String SECURITY_SSL_TRUSTSTORE = 
"security.ssl.truststore";
+
+       /** Secret to decrypt the truststore */
+       public static final String SECURITY_SSL_TRUSTSTORE_PASSWORD = 
"security.ssl.truststore-password";
+
+       /** SSL protocol version to be supported */
+       public static final String SECURITY_SSL_PROTOCOL = 
"security.ssl.protocol";
+
+       /**
+        * The standard SSL algorithms to be supported
+        * More options here - 
http://docs.oracle.com/javase/8/docs/technotes/guides/security/StandardNames.html#ciphersuites
+        * */
+       public static final String SECURITY_SSL_ALGORITHMS = 
"security.ssl.algorithms";
+
+       /** Flag to enable/disable hostname verification for the ssl 
connections */
+       public static final String SECURITY_SSL_VERIFY_HOSTNAME = 
"security.ssl.verify-hostname";
+
        // ----------------------------- Streaming 
--------------------------------
        
        /**
@@ -931,6 +983,11 @@ public final class ConfigConstants {
        public static final int DEFAULT_RESOURCE_MANAGER_IPC_PORT = 0;
 
        /**
+        * The default value to override ssl support for blob service transport
+        */
+       public static final boolean DEFAULT_BLOB_SERVICE_SSL_ENABLED = true;
+
+       /**
         * Default number of retries for failed BLOB fetches.
         */
        public static final int DEFAULT_BLOB_FETCH_RETRIES = 5;
@@ -963,6 +1020,11 @@ public final class ConfigConstants {
        public static final int DEFAULT_TASK_MANAGER_DATA_PORT = 0;
 
        /**
+        * The default value to override ssl support for task manager's data 
transport
+        */
+       public static final boolean DEFAULT_TASK_MANAGER_DATA_SSL_ENABLED = 
true;
+
+       /**
         * The default directory for temporary files of the task manager.
         */
        public static final String DEFAULT_TASK_MANAGER_TMP_PATH = 
System.getProperty("java.io.tmpdir");
@@ -1135,7 +1197,10 @@ public final class ConfigConstants {
        /** The config key for the port of the JobManager web frontend.
         * Setting this value to {@code -1} disables the web frontend. */
        public static final int DEFAULT_JOB_MANAGER_WEB_FRONTEND_PORT = 8081;
-       
+
+       /** Default value to override SSL support for the JobManager web UI */
+       public static final boolean DEFAULT_JOB_MANAGER_WEB_SSL_ENABLED = true;
+
        /** The default number of archived jobs for the jobmanager */
        public static final int DEFAULT_JOB_MANAGER_WEB_ARCHIVE_COUNT = 5;
 
@@ -1181,7 +1246,19 @@ public final class ConfigConstants {
        public static String DEFAULT_AKKA_LOOKUP_TIMEOUT = "10 s";
 
        public static String DEFAULT_AKKA_CLIENT_TIMEOUT = "60 s";
-       
+
+       public static boolean DEFAULT_AKKA_SSL_ENABLED = true;
+
+       // ----------------------------- SSL Values 
--------------------------------
+
+       public static boolean DEFAULT_SECURITY_SSL_ENABLED = false;
+
+       public static String DEFAULT_SECURITY_SSL_PROTOCOL = "TLSv1.2";
+
+       public static String DEFAULT_SECURITY_SSL_ALGORITHMS = 
"TLS_RSA_WITH_AES_128_CBC_SHA";
+
+       public static boolean DEFAULT_SECURITY_SSL_VERIFY_HOSTNAME = true;
+
        // ----------------------------- Streaming Values 
--------------------------
        
        public static String DEFAULT_STATE_BACKEND = "jobmanager";

http://git-wip-us.apache.org/repos/asf/flink/blob/069de27d/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandler.java
index b02e556..5008a8c 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandler.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandler.java
@@ -67,8 +67,9 @@ public class RuntimeMonitorHandler extends 
RuntimeMonitorHandlerBase {
                        RequestHandler handler,
                        JobManagerRetriever retriever,
                        Future<String> localJobManagerAddressFuture,
-                       FiniteDuration timeout) {
-               super(retriever, localJobManagerAddressFuture, timeout);
+                       FiniteDuration timeout,
+                       boolean httpsEnabled) {
+               super(retriever, localJobManagerAddressFuture, timeout, 
httpsEnabled);
                this.handler = checkNotNull(handler);
        }
 
@@ -89,7 +90,8 @@ public class RuntimeMonitorHandler extends 
RuntimeMonitorHandlerBase {
                        }
 
                        InetSocketAddress address = (InetSocketAddress) 
ctx.channel().localAddress();
-                       queryParams.put(WEB_MONITOR_ADDRESS_KEY, 
address.getHostName() + ":" + address.getPort());
+                       queryParams.put(WEB_MONITOR_ADDRESS_KEY,
+                               (httpsEnabled ? "https://"; : "http://";) + 
address.getHostName() + ":" + address.getPort());
 
                        String result = handler.handleRequest(pathParams, 
queryParams, jobManager);
                        byte[] bytes = result.getBytes(ENCODING);

http://git-wip-us.apache.org/repos/asf/flink/blob/069de27d/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandlerBase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandlerBase.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandlerBase.java
index 182b5f9..9442867 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandlerBase.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandlerBase.java
@@ -50,16 +50,21 @@ public abstract class RuntimeMonitorHandlerBase extends 
SimpleChannelInboundHand
 
        protected final FiniteDuration timeout;
 
+       /** Whether the web service has https enabled */
+       protected final boolean httpsEnabled;
+
        protected String localJobManagerAddress;
        
        public RuntimeMonitorHandlerBase(
                JobManagerRetriever retriever,
                Future<String> localJobManagerAddressFuture,
-               FiniteDuration timeout) {
+               FiniteDuration timeout,
+               boolean httpsEnabled) {
 
                this.retriever = checkNotNull(retriever);
                this.localJobManagerAddressFuture = 
checkNotNull(localJobManagerAddressFuture);
                this.timeout = checkNotNull(timeout);
+               this.httpsEnabled = httpsEnabled;
        }
 
        @Override
@@ -77,7 +82,8 @@ public abstract class RuntimeMonitorHandlerBase extends 
SimpleChannelInboundHand
                                        localJobManagerAddress, gatewayPort);
 
                                if (redirectAddress != null) {
-                                       HttpResponse redirect = 
HandlerRedirectUtils.getRedirectResponse(redirectAddress, routed.path());
+                                       HttpResponse redirect = 
HandlerRedirectUtils.getRedirectResponse(redirectAddress, routed.path(),
+                                               httpsEnabled);
                                        KeepAliveWrite.flush(ctx, 
routed.request(), redirect);
                                }
                                else {

http://git-wip-us.apache.org/repos/asf/flink/blob/069de27d/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
index 4dd36e7..3e2634f 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
@@ -28,11 +28,14 @@ import io.netty.channel.socket.nio.NioServerSocketChannel;
 import io.netty.handler.codec.http.HttpServerCodec;
 import io.netty.handler.codec.http.router.Handler;
 import io.netty.handler.codec.http.router.Router;
+import io.netty.handler.ssl.SslHandler;
+import io.netty.handler.stream.ChunkedWriteHandler;
 import org.apache.commons.io.FileUtils;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import org.apache.flink.runtime.net.SSLUtils;
 import org.apache.flink.runtime.webmonitor.files.StaticFileServerHandler;
 import org.apache.flink.runtime.webmonitor.handlers.ClusterOverviewHandler;
 import org.apache.flink.runtime.webmonitor.handlers.ConstantTextHandler;
@@ -79,6 +82,8 @@ import scala.concurrent.ExecutionContextExecutor;
 import scala.concurrent.Promise;
 import scala.concurrent.duration.FiniteDuration;
 
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLEngine;
 import java.io.File;
 import java.io.IOException;
 import java.net.InetSocketAddress;
@@ -118,6 +123,8 @@ public class WebRuntimeMonitor implements WebMonitor {
 
        private final Router router;
 
+       private final SSLContext serverSSLContext;
+
        private final ServerBootstrap bootstrap;
 
        private final Promise<String> jobManagerAddressPromise = new 
scala.concurrent.impl.Promise.DefaultPromise<>();
@@ -213,6 +220,22 @@ public class WebRuntimeMonitor implements WebMonitor {
 
                ExecutionContextExecutor context = 
ExecutionContext$.MODULE$.fromExecutor(executorService);
 
+               // Config to enable https access to the web-ui
+               boolean enableSSL = config.getBoolean(
+                               ConfigConstants.JOB_MANAGER_WEB_SSL_ENABLED,
+                               
ConfigConstants.DEFAULT_JOB_MANAGER_WEB_SSL_ENABLED) &&
+                       SSLUtils.getSSLEnabled(config);
+
+               if (enableSSL) {
+                       LOG.info("Enabling ssl for the web frontend");
+                       try {
+                               serverSSLContext = 
SSLUtils.createSSLServerContext(config);
+                       } catch (Exception e) {
+                               throw new IOException("Failed to initialize 
SSLContext for the web frontend", e);
+                       }
+               } else {
+                       serverSSLContext = null;
+               }
                metricFetcher = new MetricFetcher(actorSystem, retriever, 
context);
 
                router = new Router()
@@ -260,17 +283,21 @@ public class WebRuntimeMonitor implements WebMonitor {
                        .GET("/taskmanagers", handler(new 
TaskManagersHandler(DEFAULT_REQUEST_TIMEOUT)))
                        .GET("/taskmanagers/:" + 
TaskManagersHandler.TASK_MANAGER_ID_KEY + "/metrics", handler(new 
TaskManagersHandler(DEFAULT_REQUEST_TIMEOUT)))
                        .GET("/taskmanagers/:" + 
TaskManagersHandler.TASK_MANAGER_ID_KEY + "/log", 
-                               new TaskManagerLogHandler(retriever, context, 
jobManagerAddressPromise.future(), timeout, TaskManagerLogHandler.FileMode.LOG, 
config))
+                               new TaskManagerLogHandler(retriever, context, 
jobManagerAddressPromise.future(), timeout,
+                                       TaskManagerLogHandler.FileMode.LOG, 
config, enableSSL))
                        .GET("/taskmanagers/:" + 
TaskManagersHandler.TASK_MANAGER_ID_KEY + "/stdout", 
-                               new TaskManagerLogHandler(retriever, context, 
jobManagerAddressPromise.future(), timeout, 
TaskManagerLogHandler.FileMode.STDOUT, config))
+                               new TaskManagerLogHandler(retriever, context, 
jobManagerAddressPromise.future(), timeout,
+                                       TaskManagerLogHandler.FileMode.STDOUT, 
config, enableSSL))
                        .GET("/taskmanagers/:" + 
TaskManagersHandler.TASK_MANAGER_ID_KEY + "/metrics", handler(new 
TaskManagerMetricsHandler(metricFetcher)))
 
                        // log and stdout
                        .GET("/jobmanager/log", logFiles.logFile == null ? new 
ConstantTextHandler("(log file unavailable)") :
-                               new StaticFileServerHandler(retriever, 
jobManagerAddressPromise.future(), timeout, logFiles.logFile))
+                               new StaticFileServerHandler(retriever, 
jobManagerAddressPromise.future(), timeout, logFiles.logFile,
+                                       enableSSL))
 
                        .GET("/jobmanager/stdout", logFiles.stdOutFile == null 
? new ConstantTextHandler("(stdout file unavailable)") :
-                               new StaticFileServerHandler(retriever, 
jobManagerAddressPromise.future(), timeout, logFiles.stdOutFile))
+                               new StaticFileServerHandler(retriever, 
jobManagerAddressPromise.future(), timeout, logFiles.stdOutFile,
+                                       enableSSL))
 
                        .GET("/jobmanager/metrics", handler(new 
JobManagerMetricsHandler(metricFetcher)))
 
@@ -295,7 +322,7 @@ public class WebRuntimeMonitor implements WebMonitor {
                                .GET("/jars/:jarid/plan", handler(new 
JarPlanHandler(uploadDir)))
 
                                // run a jar
-                               .POST("/jars/:jarid/run", handler(new 
JarRunHandler(uploadDir, timeout)))
+                               .POST("/jars/:jarid/run", handler(new 
JarRunHandler(uploadDir, timeout, config)))
 
                                // upload a jar
                                .POST("/jars/upload", handler(new 
JarUploadHandler(uploadDir)))
@@ -311,7 +338,8 @@ public class WebRuntimeMonitor implements WebMonitor {
                }
 
                // this handler serves all the static contents
-               router.GET("/:*", new StaticFileServerHandler(retriever, 
jobManagerAddressPromise.future(), timeout, webRootDir));
+               router.GET("/:*", new StaticFileServerHandler(retriever, 
jobManagerAddressPromise.future(), timeout, webRootDir,
+                       enableSSL));
 
                // add shutdown hook for deleting the directories and remaining 
temp files on shutdown
                try {
@@ -335,8 +363,16 @@ public class WebRuntimeMonitor implements WebMonitor {
                        protected void initChannel(SocketChannel ch) {
                                Handler handler = new Handler(router);
 
+                               // SSL should be the first handler in the 
pipeline
+                               if (serverSSLContext != null) {
+                                       SSLEngine sslEngine = 
serverSSLContext.createSSLEngine();
+                                       sslEngine.setUseClientMode(false);
+                                       ch.pipeline().addLast("ssl", new 
SslHandler(sslEngine));
+                               }
+
                                ch.pipeline()
                                                .addLast(new HttpServerCodec())
+                                               .addLast(new 
ChunkedWriteHandler())
                                                .addLast(new 
HttpRequestHandler(uploadDir))
                                                .addLast(handler.name(), 
handler)
                                                .addLast(new 
PipelineErrorHandler(LOG));
@@ -454,7 +490,8 @@ public class WebRuntimeMonitor implements WebMonitor {
        //  Utilities
        // 
------------------------------------------------------------------------
        private RuntimeMonitorHandler handler(RequestHandler handler) {
-               return new RuntimeMonitorHandler(handler, retriever, 
jobManagerAddressPromise.future(), timeout);
+               return new RuntimeMonitorHandler(handler, retriever, 
jobManagerAddressPromise.future(), timeout,
+                       serverSSLContext !=  null);
        }
 
        File getBaseDir(Configuration configuration) {

http://git-wip-us.apache.org/repos/asf/flink/blob/069de27d/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/files/StaticFileServerHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/files/StaticFileServerHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/files/StaticFileServerHandler.java
index d98471c..5a8c76f 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/files/StaticFileServerHandler.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/files/StaticFileServerHandler.java
@@ -36,6 +36,7 @@ import io.netty.channel.SimpleChannelInboundHandler;
 import io.netty.handler.codec.http.DefaultFullHttpResponse;
 import io.netty.handler.codec.http.DefaultHttpResponse;
 import io.netty.handler.codec.http.FullHttpResponse;
+import io.netty.handler.codec.http.HttpChunkedInput;
 import io.netty.handler.codec.http.HttpHeaders;
 import io.netty.handler.codec.http.HttpRequest;
 import io.netty.handler.codec.http.HttpResponse;
@@ -43,6 +44,8 @@ import io.netty.handler.codec.http.HttpResponseStatus;
 import io.netty.handler.codec.http.LastHttpContent;
 import io.netty.handler.codec.http.router.KeepAliveWrite;
 import io.netty.handler.codec.http.router.Routed;
+import io.netty.handler.ssl.SslHandler;
+import io.netty.handler.stream.ChunkedFile;
 import io.netty.util.CharsetUtil;
 import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.webmonitor.JobManagerRetriever;
@@ -120,6 +123,9 @@ public class StaticFileServerHandler extends 
SimpleChannelInboundHandler<Routed>
        /** The path in which the static documents are */
        private final File rootPath;
 
+       /** Whether the web service has https enabled */
+       private final boolean httpsEnabled;
+
        /** The log for all error reporting */
        private final Logger logger;
 
@@ -129,9 +135,10 @@ public class StaticFileServerHandler extends 
SimpleChannelInboundHandler<Routed>
                        JobManagerRetriever retriever,
                        Future<String> localJobManagerAddressPromise,
                        FiniteDuration timeout,
-                       File rootPath) throws IOException {
+                       File rootPath,
+                       boolean httpsEnabled) throws IOException {
 
-               this(retriever, localJobManagerAddressPromise, timeout, 
rootPath, DEFAULT_LOGGER);
+               this(retriever, localJobManagerAddressPromise, timeout, 
rootPath, httpsEnabled, DEFAULT_LOGGER);
        }
 
        public StaticFileServerHandler(
@@ -139,12 +146,14 @@ public class StaticFileServerHandler extends 
SimpleChannelInboundHandler<Routed>
                        Future<String> localJobManagerAddressFuture,
                        FiniteDuration timeout,
                        File rootPath,
+                       boolean httpsEnabled,
                        Logger logger) throws IOException {
 
                this.retriever = checkNotNull(retriever);
                this.localJobManagerAddressFuture = 
checkNotNull(localJobManagerAddressFuture);
                this.timeout = checkNotNull(timeout);
                this.rootPath = checkNotNull(rootPath).getCanonicalFile();
+               this.httpsEnabled = httpsEnabled;
                this.logger = checkNotNull(logger);
        }
 
@@ -180,7 +189,8 @@ public class StaticFileServerHandler extends 
SimpleChannelInboundHandler<Routed>
                                        localJobManagerAddress, 
jobManager.get());
 
                                if (redirectAddress != null) {
-                                       HttpResponse redirect = 
HandlerRedirectUtils.getRedirectResponse(redirectAddress, requestPath);
+                                       HttpResponse redirect = 
HandlerRedirectUtils.getRedirectResponse(
+                                               redirectAddress, requestPath, 
httpsEnabled);
                                        KeepAliveWrite.flush(ctx, 
routed.request(), redirect);
                                }
                                else {
@@ -304,8 +314,15 @@ public class StaticFileServerHandler extends 
SimpleChannelInboundHandler<Routed>
                ctx.write(response);
 
                // write the content.
-               ctx.write(new DefaultFileRegion(raf.getChannel(), 0, 
fileLength), ctx.newProgressivePromise());
-               ChannelFuture lastContentFuture = 
ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT);
+               ChannelFuture lastContentFuture;
+               if (ctx.pipeline().get(SslHandler.class) == null) {
+                       ctx.write(new DefaultFileRegion(raf.getChannel(), 0, 
fileLength), ctx.newProgressivePromise());
+                       lastContentFuture = 
ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT);
+               } else {
+                       lastContentFuture = ctx.writeAndFlush(new 
HttpChunkedInput(new ChunkedFile(raf, 0, fileLength, 8192)),
+                               ctx.newProgressivePromise());
+                       // HttpChunkedInput will write the end marker 
(LastHttpContent) for us.
+               }
 
                // close the connection, if no keep-alive is needed
                if (!HttpHeaders.isKeepAlive(request)) {

http://git-wip-us.apache.org/repos/asf/flink/blob/069de27d/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/HandlerRedirectUtils.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/HandlerRedirectUtils.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/HandlerRedirectUtils.java
index 5db8505..21a0f8c 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/HandlerRedirectUtils.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/HandlerRedirectUtils.java
@@ -79,11 +79,12 @@ public class HandlerRedirectUtils {
                return null;
        }
 
-       public static HttpResponse getRedirectResponse(String redirectAddress, 
String path) throws Exception {
+       public static HttpResponse getRedirectResponse(String redirectAddress, 
String path, boolean httpsEnabled) throws Exception {
                checkNotNull(redirectAddress, "Redirect address");
                checkNotNull(path, "Path");
 
-               String newLocation = String.format("http://%s%s";, 
redirectAddress, path);
+               String protocol = httpsEnabled ? "https" : "http";
+               String newLocation = String.format("%s://%s%s", protocol, 
redirectAddress, path);
 
                HttpResponse redirectResponse = new DefaultFullHttpResponse(
                                HttpVersion.HTTP_1_1, 
HttpResponseStatus.TEMPORARY_REDIRECT);

http://git-wip-us.apache.org/repos/asf/flink/blob/069de27d/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java
index b9e773a..18b0f15 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.webmonitor.handlers;
 import com.fasterxml.jackson.core.JsonGenerator;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.client.program.ProgramInvocationException;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.client.JobClient;
 import org.apache.flink.runtime.client.JobExecutionException;
 import org.apache.flink.runtime.instance.ActorGateway;
@@ -38,10 +39,12 @@ import java.util.Map;
 public class JarRunHandler extends JarActionHandler {
 
        private final FiniteDuration timeout;
+       private final Configuration clientConfig;
 
-       public JarRunHandler(File jarDirectory, FiniteDuration timeout) {
+       public JarRunHandler(File jarDirectory, FiniteDuration timeout, 
Configuration clientConfig) {
                super(jarDirectory);
                this.timeout = timeout;
+               this.clientConfig = clientConfig;
        }
 
        @Override
@@ -49,13 +52,13 @@ public class JarRunHandler extends JarActionHandler {
                try {
                        Tuple2<JobGraph, ClassLoader> graph = 
getJobGraphAndClassLoader(pathParams, queryParams);
                        try {
-                               graph.f0.uploadUserJars(jobManager, timeout);
+                               graph.f0.uploadUserJars(jobManager, timeout, 
clientConfig);
                        } catch (IOException e) {
                                throw new ProgramInvocationException("Failed to 
upload jar files to the job manager", e);
                        }
 
                        try {
-                               JobClient.submitJobDetached(jobManager, 
graph.f0, timeout, graph.f1);
+                               JobClient.submitJobDetached(jobManager, 
clientConfig, graph.f0, timeout, graph.f1);
                        } catch (JobExecutionException e) {
                                throw new ProgramInvocationException("Failed to 
submit the job to the job manager", e);
                        }

http://git-wip-us.apache.org/repos/asf/flink/blob/069de27d/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobManagerConfigHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobManagerConfigHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobManagerConfigHandler.java
index fce5b92..6d9f7e1 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobManagerConfigHandler.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobManagerConfigHandler.java
@@ -45,7 +45,18 @@ public class JobManagerConfigHandler implements 
RequestHandler {
                for (String key : config.keySet()) {
                        gen.writeStartObject();
                        gen.writeStringField("key", key);
-                       gen.writeStringField("value", config.getString(key, 
null));
+
+                       // Mask key values which contain sensitive information
+                       if(key.toLowerCase().contains("password")) {
+                               String value = config.getString(key, null);
+                               if(value != null) {
+                                       value = "******";
+                               }
+                               gen.writeStringField("value", value);
+                       }
+                       else {
+                               gen.writeStringField("value", 
config.getString(key, null));
+                       }
                        gen.writeEndObject();
                }
                gen.writeEndArray();

http://git-wip-us.apache.org/repos/asf/flink/blob/069de27d/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandler.java
index 2f0d438..5497402 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandler.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandler.java
@@ -37,11 +37,14 @@ import io.netty.channel.ChannelHandler;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.DefaultFileRegion;
 import io.netty.handler.codec.http.DefaultHttpResponse;
+import io.netty.handler.codec.http.HttpChunkedInput;
 import io.netty.handler.codec.http.HttpHeaders;
 import io.netty.handler.codec.http.HttpRequest;
 import io.netty.handler.codec.http.HttpResponse;
 import io.netty.handler.codec.http.LastHttpContent;
 import io.netty.handler.codec.http.router.Routed;
+import io.netty.handler.ssl.SslHandler;
+import io.netty.handler.stream.ChunkedFile;
 import io.netty.util.concurrent.GenericFutureListener;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.blob.BlobCache;
@@ -114,8 +117,9 @@ public class TaskManagerLogHandler extends 
RuntimeMonitorHandlerBase {
                Future<String> localJobManagerAddressPromise,
                FiniteDuration timeout,
                FileMode fileMode,
-               Configuration config) throws IOException {
-               super(retriever, localJobManagerAddressPromise, timeout);
+               Configuration config,
+               boolean httpsEnabled) throws IOException {
+               super(retriever, localJobManagerAddressPromise, timeout, 
httpsEnabled);
 
                this.executor = checkNotNull(executor);
                this.config = config;
@@ -233,16 +237,30 @@ public class TaskManagerLogHandler extends 
RuntimeMonitorHandlerBase {
                                                ctx.write(response);
 
                                                // write the content.
-                                               ctx.write(new 
DefaultFileRegion(fc, 0, fileLength), ctx.newProgressivePromise())
-                                                       .addListener(new 
GenericFutureListener<io.netty.util.concurrent.Future<? super Void>>() {
+                                               ChannelFuture lastContentFuture;
+                                               final 
GenericFutureListener<io.netty.util.concurrent.Future<? super Void>> 
completionListener =
+                                                       new 
GenericFutureListener<io.netty.util.concurrent.Future<? super Void>>() {
                                                                @Override
-                                                               public void 
operationComplete(io.netty.util.concurrent.Future<? super Void> future) throws 
Exception {
-                                                                       
lastRequestPending.remove(taskManagerID);
-                                                                       
fc.close();
-                                                                       
raf.close();
+                                                               public void 
operationComplete(
+                                                                       
io.netty.util.concurrent.Future<? super Void> future) throws Exception {
+                                                                               
lastRequestPending.remove(taskManagerID);
+                                                                               
fc.close();
+                                                                               
raf.close();
                                                                }
-                                                       });
-                                               ChannelFuture lastContentFuture 
= ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT);
+                                                       };
+                                               if 
(ctx.pipeline().get(SslHandler.class) == null) {
+                                                       ctx.write(
+                                                               new 
DefaultFileRegion(fc, 0, fileLength), ctx.newProgressivePromise())
+                                                                       
.addListener(completionListener);
+                                                       lastContentFuture = 
ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT);
+
+                                               } else {
+                                                       lastContentFuture = 
ctx.writeAndFlush(
+                                                               new 
HttpChunkedInput(new ChunkedFile(raf, 0, fileLength, 8192)),
+                                                               
ctx.newProgressivePromise())
+                                                                       
.addListener(completionListener);
+                                                       // HttpChunkedInput 
will write the end marker (LastHttpContent) for us.
+                                               }
 
                                                // close the connection, if no 
keep-alive is needed
                                                if 
(!HttpHeaders.isKeepAlive(request)) {

http://git-wip-us.apache.org/repos/asf/flink/blob/069de27d/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTrackerITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTrackerITCase.java
 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTrackerITCase.java
index 507c977..cb95526 100644
--- 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTrackerITCase.java
+++ 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTrackerITCase.java
@@ -121,7 +121,7 @@ public class BackPressureStatsTrackerITCase extends 
TestLogger {
                        try {
                                jobManger = 
TestingUtils.createJobManager(testActorSystem, new Configuration());
 
-                               Configuration config = new Configuration();
+                               final Configuration config = new 
Configuration();
                                
config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, parallelism);
 
                                taskManager = TestingUtils.createTaskManager(
@@ -138,6 +138,7 @@ public class BackPressureStatsTrackerITCase extends 
TestLogger {
                                                        // Submit the job and 
wait until it is running
                                                        
JobClient.submitJobDetached(
                                                                        jm,
+                                                                       config,
                                                                        
jobGraph,
                                                                        
deadline,
                                                                        
ClassLoader.getSystemClassLoader());

http://git-wip-us.apache.org/repos/asf/flink/blob/069de27d/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinatorITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinatorITCase.java
 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinatorITCase.java
index c4ce9d1..8ae30c6 100644
--- 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinatorITCase.java
+++ 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinatorITCase.java
@@ -91,7 +91,7 @@ public class StackTraceSampleCoordinatorITCase extends 
TestLogger {
                        try {
                                jobManger = 
TestingUtils.createJobManager(testActorSystem, new Configuration());
 
-                               Configuration config = new Configuration();
+                               final Configuration config = new 
Configuration();
                                
config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, parallelism);
 
                                taskManager = TestingUtils.createTaskManager(
@@ -112,6 +112,7 @@ public class StackTraceSampleCoordinatorITCase extends 
TestLogger {
                                                                // Submit the 
job and wait until it is running
                                                                
JobClient.submitJobDetached(
                                                                                
jm,
+                                                                               
config,
                                                                                
jobGraph,
                                                                                
deadline,
                                                                                
ClassLoader.getSystemClassLoader());

http://git-wip-us.apache.org/repos/asf/flink/blob/069de27d/flink-runtime-web/web-dashboard/app/partials/submit.jade
----------------------------------------------------------------------
diff --git a/flink-runtime-web/web-dashboard/app/partials/submit.jade 
b/flink-runtime-web/web-dashboard/app/partials/submit.jade
index 7fdea88..45547a0 100644
--- a/flink-runtime-web/web-dashboard/app/partials/submit.jade
+++ b/flink-runtime-web/web-dashboard/app/partials/submit.jade
@@ -30,7 +30,7 @@ nav.navbar.navbar-default.navbar-fixed-top.navbar-main
     | Yarn's AM proxy doesn't allow file uploads. Please wait while we fetch 
an alternate url for you to use
   pre(ng-if="address")
     | Yarn's AM proxy doesn't allow file uploads. You can visit&nbsp;
-    a(href="http://{{address}}/#/submit";)
+    a(href="{{address}}/#/submit")
       | here
     | &nbsp;to access this functionality.
 

http://git-wip-us.apache.org/repos/asf/flink/blob/069de27d/flink-runtime-web/web-dashboard/web/partials/submit.html
----------------------------------------------------------------------
diff --git a/flink-runtime-web/web-dashboard/web/partials/submit.html 
b/flink-runtime-web/web-dashboard/web/partials/submit.html
index 942b066..59df26e 100644
--- a/flink-runtime-web/web-dashboard/web/partials/submit.html
+++ b/flink-runtime-web/web-dashboard/web/partials/submit.html
@@ -26,7 +26,7 @@ limitations under the License.
 </div>
 <div id="content-inner" ng-if="!noaccess &amp;&amp; yarn">
   <pre ng-if="!address">Yarn's AM proxy doesn't allow file uploads. Please 
wait while we fetch an alternate url for you to use</pre>
-  <pre ng-if="address">Yarn's AM proxy doesn't allow file uploads. You can 
visit&nbsp;<a href="http://{{address}}/#/submit";>here</a>&nbsp;to access this 
functionality.</pre>
+  <pre ng-if="address">Yarn's AM proxy doesn't allow file uploads. You can 
visit&nbsp;<a href="{{address}}/#/submit">here</a>&nbsp;to access this 
functionality.</pre>
 </div>
 <div id="content-inner" ng-if="!noaccess &amp;&amp; jars &amp;&amp; !yarn">
   <div class="panel panel-default">
@@ -106,4 +106,4 @@ limitations under the License.
       </tr>
     </tbody>
   </table>
-</div>
\ No newline at end of file
+</div>

http://git-wip-us.apache.org/repos/asf/flink/blob/069de27d/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java
index bb0aacb..3f93652 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java
@@ -56,21 +56,26 @@ public final class BlobCache implements BlobService {
        /** The number of retries when the transfer fails */
        private final int numFetchRetries;
 
+       /** Configuration for the blob client like ssl parameters required to 
connect to the blob server */
+       private final Configuration blobClientConfig;
 
-       public BlobCache(InetSocketAddress serverAddress, Configuration 
configuration) {
-               if (serverAddress == null || configuration == null) {
+
+       public BlobCache(InetSocketAddress serverAddress, Configuration 
blobClientConfig) {
+               if (serverAddress == null || blobClientConfig == null) {
                        throw new NullPointerException();
                }
 
                this.serverAddress = serverAddress;
 
+               this.blobClientConfig = blobClientConfig;
+
                // configure and create the storage directory
-               String storageDirectory = 
configuration.getString(ConfigConstants.BLOB_STORAGE_DIRECTORY_KEY, null);
+               String storageDirectory = 
blobClientConfig.getString(ConfigConstants.BLOB_STORAGE_DIRECTORY_KEY, null);
                this.storageDir = 
BlobUtils.initStorageDirectory(storageDirectory);
                LOG.info("Created BLOB cache storage directory " + storageDir);
 
                // configure the number of fetch retries
-               final int fetchRetries = configuration.getInteger(
+               final int fetchRetries = blobClientConfig.getInteger(
                        ConfigConstants.BLOB_FETCH_RETRIES_KEY, 
ConfigConstants.DEFAULT_BLOB_FETCH_RETRIES);
                if (fetchRetries >= 0) {
                        this.numFetchRetries = fetchRetries;
@@ -121,7 +126,7 @@ public final class BlobCache implements BlobService {
                                        OutputStream os = null;
 
                                        try {
-                                               bc = new 
BlobClient(serverAddress);
+                                               bc = new 
BlobClient(serverAddress, blobClientConfig);
                                                is = bc.get(requiredBlob);
                                                os = new 
FileOutputStream(localJarFile);
 
@@ -245,7 +250,7 @@ public final class BlobCache implements BlobService {
 
        @Override
        public BlobClient createClient() throws IOException {
-               return new BlobClient(serverAddress);
+               return new BlobClient(serverAddress, blobClientConfig);
        }
 
        public File getStorageDir() {

http://git-wip-us.apache.org/repos/asf/flink/blob/069de27d/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java
index fa4c08c..2748967 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java
@@ -19,11 +19,14 @@
 package org.apache.flink.runtime.blob;
 
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.FSDataInputStream;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.messages.JobManagerMessages;
+import org.apache.flink.runtime.net.SSLUtils;
 import org.apache.flink.util.InstantiationUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -32,6 +35,9 @@ import scala.concurrent.Await;
 import scala.concurrent.Future;
 import scala.concurrent.duration.FiniteDuration;
 
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLParameters;
+import javax.net.ssl.SSLSocket;
 import java.io.Closeable;
 import java.io.EOFException;
 import java.io.FileNotFoundException;
@@ -68,22 +74,52 @@ public final class BlobClient implements Closeable {
        private static final Logger LOG = 
LoggerFactory.getLogger(BlobClient.class);
 
        /** The socket connection to the BLOB server. */
-       private final Socket socket;
+       private Socket socket;
 
        /**
         * Instantiates a new BLOB client.
         * 
         * @param serverAddress
         *        the network address of the BLOB server
+        * @param clientConfig
+        *        additional configuration like SSL parameters required to 
connect to the blob server
         * @throws IOException
         *         thrown if the connection to the BLOB server could not be 
established
         */
-       public BlobClient(InetSocketAddress serverAddress) throws IOException {
-               this.socket = new Socket();
+       public BlobClient(InetSocketAddress serverAddress, Configuration 
clientConfig) throws IOException {
+
                try {
-                       this.socket.connect(serverAddress);
+                       // Check if ssl is enabled
+                       SSLContext clientSSLContext = null;
+                       if (clientConfig != null &&
+                               
clientConfig.getBoolean(ConfigConstants.BLOB_SERVICE_SSL_ENABLED,
+                                               
ConfigConstants.DEFAULT_BLOB_SERVICE_SSL_ENABLED)) {
+
+                               clientSSLContext = 
SSLUtils.createSSLClientContext(clientConfig);
+                       }
+
+                       if (clientSSLContext != null) {
+
+                               LOG.info("Using ssl connection to the blob 
server");
+
+                               SSLSocket sslSocket = (SSLSocket) 
clientSSLContext.getSocketFactory().createSocket(
+                                       serverAddress.getAddress(),
+                                       serverAddress.getPort());
+
+                               // Enable hostname verification for remote SSL 
connections
+                               if 
(!serverAddress.getAddress().isLoopbackAddress()) {
+                                       SSLParameters newSSLParameters = 
sslSocket.getSSLParameters();
+                                       
SSLUtils.setSSLVerifyHostname(clientConfig, newSSLParameters);
+                                       
sslSocket.setSSLParameters(newSSLParameters);
+                               }
+                               this.socket = sslSocket;
+                       } else {
+                               this.socket = new Socket();
+                               this.socket.connect(serverAddress);
+                       }
+
                }
-               catch(IOException e) {
+               catch(Exception e) {
                        BlobUtils.closeSilently(socket, LOG);
                        throw new IOException("Could not connect to BlobServer 
at address " + serverAddress, e);
                }
@@ -671,14 +707,16 @@ public final class BlobClient implements Closeable {
         * Retrieves the {@link BlobServer} address from the JobManager and 
uploads
         * the JAR files to it.
         *
-        * @param jobManager Server address of the {@link BlobServer}
-        * @param askTimeout Ask timeout for blob server address retrieval
-        * @param jars       List of JAR files to upload
+        * @param jobManager   Server address of the {@link BlobServer}
+        * @param askTimeout   Ask timeout for blob server address retrieval
+        * @param clientConfig Any additional configuration for the blob client
+        * @param jars         List of JAR files to upload
         * @throws IOException Thrown if the address retrieval or upload fails
         */
        public static List<BlobKey> uploadJarFiles(
                        ActorGateway jobManager,
                        FiniteDuration askTimeout,
+                       Configuration clientConfig,
                        List<Path> jars) throws IOException {
 
                if (jars.isEmpty()) {
@@ -692,13 +730,14 @@ public final class BlobClient implements Closeable {
                                Object result = Await.result(futureBlobPort, 
askTimeout);
                                if (result instanceof Integer) {
                                        int port = (Integer) result;
+                                       LOG.info("Blob client connecting to " + 
jobManager.path());
 
                                        Option<String> jmHost = 
jobManager.actor().path().address().host();
                                        String jmHostname = jmHost.isDefined() 
? jmHost.get() : "localhost";
                                        InetSocketAddress serverAddress = new 
InetSocketAddress(jmHostname, port);
 
                                        // Now, upload
-                                       return uploadJarFiles(serverAddress, 
jars);
+                                       return uploadJarFiles(serverAddress, 
clientConfig, jars);
                                } else {
                                        throw new Exception("Expected port 
number (int) as answer, received " + result);
                                }
@@ -712,16 +751,18 @@ public final class BlobClient implements Closeable {
         * Uploads the JAR files to a {@link BlobServer} at the given address.
         *
         * @param serverAddress Server address of the {@link BlobServer}
+        * @param clientConfig Any additional configuration for the blob client
         * @param jars List of JAR files to upload
         * @throws IOException Thrown if the upload fails
         */
-       public static List<BlobKey> uploadJarFiles(InetSocketAddress 
serverAddress, List<Path> jars) throws IOException {
+       public static List<BlobKey> uploadJarFiles(InetSocketAddress 
serverAddress,
+                       Configuration clientConfig, List<Path> jars) throws 
IOException {
                if (jars.isEmpty()) {
                        return Collections.emptyList();
                } else {
                        List<BlobKey> blobKeys = new ArrayList<>();
 
-                       try (BlobClient blobClient = new 
BlobClient(serverAddress)) {
+                       try (BlobClient blobClient = new 
BlobClient(serverAddress, clientConfig)) {
                                for (final Path jar : jars) {
                                        final FileSystem fs = 
jar.getFileSystem();
                                        FSDataInputStream is = null;

http://git-wip-us.apache.org/repos/asf/flink/blob/069de27d/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
index ff54b67..b800500 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
@@ -24,10 +24,12 @@ import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.IllegalConfigurationException;
 import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
+import org.apache.flink.runtime.net.SSLUtils;
 import org.apache.flink.util.NetUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.net.ssl.SSLContext;
 import java.io.File;
 import java.io.FileNotFoundException;
 import java.io.IOException;
@@ -60,6 +62,12 @@ public class BlobServer extends Thread implements 
BlobService {
        /** The server socket listening for incoming connections. */
        private final ServerSocket serverSocket;
 
+       /** The SSL server context if ssl is enabled for the connections */
+       private SSLContext serverSSLContext = null;
+
+       /** Blob Server configuration */
+       private final Configuration blobServiceConfiguration;
+
        /** Indicates whether a shutdown of server component has been 
requested. */
        private final AtomicBoolean shutdownRequested = new AtomicBoolean();
 
@@ -92,6 +100,8 @@ public class BlobServer extends Thread implements 
BlobService {
 
                HighAvailabilityMode highAvailabilityMode = 
HighAvailabilityMode.fromConfig(config);
 
+               this.blobServiceConfiguration = config;
+
                // configure and create the storage directory
                String storageDirectory = 
config.getString(ConfigConstants.BLOB_STORAGE_DIRECTORY_KEY, null);
                this.storageDir = 
BlobUtils.initStorageDirectory(storageDirectory);
@@ -133,6 +143,15 @@ public class BlobServer extends Thread implements 
BlobService {
                        this.shutdownHook = null;
                }
 
+               if (config.getBoolean(ConfigConstants.BLOB_SERVICE_SSL_ENABLED,
+                               
ConfigConstants.DEFAULT_BLOB_SERVICE_SSL_ENABLED)) {
+                       try {
+                               serverSSLContext = 
SSLUtils.createSSLServerContext(config);
+                       } catch (Exception e) {
+                               throw new IOException("Failed to initialize 
SSLContext for the blob server", e);
+                       }
+               }
+
                //  ----------------------- start the server -------------------
 
                String serverPortRange = 
config.getString(ConfigConstants.BLOB_SERVER_PORT, 
ConfigConstants.DEFAULT_BLOB_SERVER_PORT);
@@ -143,7 +162,12 @@ public class BlobServer extends Thread implements 
BlobService {
                ServerSocket socketAttempt = 
NetUtils.createSocketFromPorts(ports, new NetUtils.SocketFactory() {
                        @Override
                        public ServerSocket createSocket(int port) throws 
IOException {
-                               return new ServerSocket(port, finalBacklog);
+                               if (serverSSLContext == null) {
+                                       return new ServerSocket(port, 
finalBacklog);
+                               } else {
+                                       LOG.info("Enabling ssl for the blob 
server");
+                                       return 
serverSSLContext.getServerSocketFactory().createServerSocket(port, 
finalBacklog);
+                               }
                        }
                });
 
@@ -320,7 +344,8 @@ public class BlobServer extends Thread implements 
BlobService {
 
        @Override
        public BlobClient createClient() throws IOException {
-               return new BlobClient(new 
InetSocketAddress(serverSocket.getInetAddress(), getPort()));
+               return new BlobClient(new 
InetSocketAddress(serverSocket.getInetAddress(), getPort()),
+                       blobServiceConfiguration);
        }
 
        /**

http://git-wip-us.apache.org/repos/asf/flink/blob/069de27d/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java
index 7e7ed9e..88e7cfb 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java
@@ -68,10 +68,10 @@ public class JobClient {
 
        private static final Logger LOG = 
LoggerFactory.getLogger(JobClient.class);
 
-
        public static ActorSystem startJobClientActorSystem(Configuration 
config)
                        throws IOException {
                LOG.info("Starting JobClient actor system");
+
                Option<Tuple2<String, Object>> remoting = new Some<>(new 
Tuple2<String, Object>("", 0));
 
                // start a remote actor system to listen on an arbitrary port
@@ -95,6 +95,7 @@ public class JobClient {
         */
        public static JobListeningContext submitJob(
                        ActorSystem actorSystem,
+                       Configuration config,
                        LeaderRetrievalService leaderRetrievalService,
                        JobGraph jobGraph,
                        FiniteDuration timeout,
@@ -113,7 +114,8 @@ public class JobClient {
                Props jobClientActorProps = 
JobSubmissionClientActor.createActorProps(
                        leaderRetrievalService,
                        timeout,
-                       sysoutLogUpdates);
+                       sysoutLogUpdates,
+                       config);
 
                ActorRef jobClientActor = 
actorSystem.actorOf(jobClientActorProps);
 
@@ -348,6 +350,7 @@ public class JobClient {
         * case a [[JobExecutionException]] is thrown.
         *
         * @param actorSystem The actor system that performs the communication.
+        * @param config The cluster wide configuration.
         * @param leaderRetrievalService Leader retrieval service which used to 
find the current leading
         *                               JobManager
         * @param jobGraph    JobGraph describing the Flink job
@@ -360,6 +363,7 @@ public class JobClient {
         */
        public static JobExecutionResult submitJobAndWait(
                        ActorSystem actorSystem,
+                       Configuration config,
                        LeaderRetrievalService leaderRetrievalService,
                        JobGraph jobGraph,
                        FiniteDuration timeout,
@@ -368,6 +372,7 @@ public class JobClient {
 
                JobListeningContext jobListeningContext = submitJob(
                                actorSystem,
+                               config,
                                leaderRetrievalService,
                                jobGraph,
                                timeout,
@@ -382,11 +387,13 @@ public class JobClient {
         * JobManager and waits for the answer whether the job could be started 
or not.
         *
         * @param jobManagerGateway Gateway to the JobManager which will 
execute the jobs
+        * @param config The cluster wide configuration.
         * @param jobGraph The job
         * @param timeout  Timeout in which the JobManager must have responded.
         */
        public static void submitJobDetached(
                        ActorGateway jobManagerGateway,
+                       Configuration config,
                        JobGraph jobGraph,
                        FiniteDuration timeout,
                        ClassLoader classLoader) throws JobExecutionException {
@@ -397,7 +404,7 @@ public class JobClient {
 
                LOG.info("Checking and uploading JAR files");
                try {
-                       jobGraph.uploadUserJars(jobManagerGateway, timeout);
+                       jobGraph.uploadUserJars(jobManagerGateway, timeout, 
config);
                }
                catch (IOException e) {
                        throw new JobSubmissionException(jobGraph.getJobID(),

http://git-wip-us.apache.org/repos/asf/flink/blob/069de27d/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobSubmissionClientActor.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobSubmissionClientActor.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobSubmissionClientActor.java
index 2cc4a50..a3fee21 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobSubmissionClientActor.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobSubmissionClientActor.java
@@ -23,6 +23,7 @@ import akka.actor.Props;
 import akka.actor.Status;
 import akka.dispatch.Futures;
 import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.akka.ListeningBehaviour;
 import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.instance.AkkaActorGateway;
@@ -47,12 +48,16 @@ public class JobSubmissionClientActor extends 
JobClientActor {
        private JobGraph jobGraph;
        /** true if a SubmitJobSuccess message has been received */
        private boolean jobSuccessfullySubmitted = false;
+       /** The cluster configuration */
+       private final Configuration clientConfig;
 
        public JobSubmissionClientActor(
                        LeaderRetrievalService leaderRetrievalService,
                        FiniteDuration timeout,
-                       boolean sysoutUpdates) {
+                       boolean sysoutUpdates,
+                       Configuration clientConfig) {
                super(leaderRetrievalService, timeout, sysoutUpdates);
+               this.clientConfig = clientConfig;
        }
 
 
@@ -140,7 +145,7 @@ public class JobSubmissionClientActor extends 
JobClientActor {
                                LOG.info("Upload jar files to job manager {}.", 
jobManager.path());
 
                                try {
-                                       
jobGraph.uploadUserJars(jobManagerGateway, timeout);
+                                       
jobGraph.uploadUserJars(jobManagerGateway, timeout, clientConfig);
                                } catch (IOException exception) {
                                        getSelf().tell(
                                                decorateMessage(new 
JobManagerMessages.JobResultFailure(
@@ -182,11 +187,13 @@ public class JobSubmissionClientActor extends 
JobClientActor {
        public static Props createActorProps(
                        LeaderRetrievalService leaderRetrievalService,
                        FiniteDuration timeout,
-                       boolean sysoutUpdates) {
+                       boolean sysoutUpdates,
+                       Configuration clientConfig) {
                return Props.create(
                        JobSubmissionClientActor.class,
                        leaderRetrievalService,
                        timeout,
-                       sysoutUpdates);
+                       sysoutUpdates,
+                       clientConfig);
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/069de27d/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyClient.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyClient.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyClient.java
index 7e80fb8..9b0bb00 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyClient.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyClient.java
@@ -28,11 +28,15 @@ import io.netty.channel.epoll.EpollSocketChannel;
 import io.netty.channel.nio.NioEventLoopGroup;
 import io.netty.channel.socket.SocketChannel;
 import io.netty.channel.socket.nio.NioSocketChannel;
+import io.netty.handler.ssl.SslHandler;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLEngine;
+import javax.net.ssl.SSLParameters;
 import java.io.IOException;
-import java.net.SocketAddress;
+import java.net.InetSocketAddress;
 
 import static org.apache.flink.util.Preconditions.checkState;
 
@@ -42,8 +46,12 @@ class NettyClient {
 
        private final NettyConfig config;
 
+       private NettyProtocol protocol;
+
        private Bootstrap bootstrap;
 
+       private SSLContext clientSSLContext = null;
+
        NettyClient(NettyConfig config) {
                this.config = config;
        }
@@ -51,6 +59,8 @@ class NettyClient {
        void init(final NettyProtocol protocol, NettyBufferPool 
nettyBufferPool) throws IOException {
                checkState(bootstrap == null, "Netty client has already been 
initialized.");
 
+               this.protocol = protocol;
+
                long start = System.currentTimeMillis();
 
                bootstrap = new Bootstrap();
@@ -99,16 +109,11 @@ class NettyClient {
                        bootstrap.option(ChannelOption.SO_RCVBUF, 
receiveAndSendBufferSize);
                }
 
-               // 
--------------------------------------------------------------------
-               // Child channel pipeline for accepted connections
-               // 
--------------------------------------------------------------------
-
-               bootstrap.handler(new ChannelInitializer<SocketChannel>() {
-                       @Override
-                       public void initChannel(SocketChannel channel) throws 
Exception {
-                               
channel.pipeline().addLast(protocol.getClientChannelHandlers());
-                       }
-               });
+               try {
+                       clientSSLContext = config.createClientSSLContext();
+               } catch (Exception e) {
+                       throw new IOException("Failed to initialize SSL Context 
for the Netty client", e);
+               }
 
                long end = System.currentTimeMillis();
                LOG.info("Successful initialization (took {} ms).", (end - 
start));
@@ -158,9 +163,37 @@ class NettyClient {
        // Client connections
        // 
------------------------------------------------------------------------
 
-       ChannelFuture connect(SocketAddress serverSocketAddress) {
+       ChannelFuture connect(final InetSocketAddress serverSocketAddress) {
                checkState(bootstrap != null, "Client has not been initialized 
yet.");
 
+               // 
--------------------------------------------------------------------
+               // Child channel pipeline for accepted connections
+               // 
--------------------------------------------------------------------
+
+               bootstrap.handler(new ChannelInitializer<SocketChannel>() {
+                       @Override
+                       public void initChannel(SocketChannel channel) throws 
Exception {
+
+                               // SSL handler should be added first in the 
pipeline
+                               if (clientSSLContext != null) {
+                                       SSLEngine sslEngine = 
clientSSLContext.createSSLEngine(
+                                               
serverSocketAddress.getAddress().getHostAddress(),
+                                               serverSocketAddress.getPort());
+                                       sslEngine.setUseClientMode(true);
+
+                                       // Enable hostname verification for 
remote SSL connections
+                                       if 
(!serverSocketAddress.getAddress().isLoopbackAddress()) {
+                                               SSLParameters newSSLParameters 
= sslEngine.getSSLParameters();
+                                               
config.setSSLVerifyHostname(newSSLParameters);
+                                               
sslEngine.setSSLParameters(newSSLParameters);
+                                       }
+
+                                       channel.pipeline().addLast("ssl", new 
SslHandler(sslEngine));
+                               }
+                               
channel.pipeline().addLast(protocol.getClientChannelHandlers());
+                       }
+               });
+
                try {
                        return bootstrap.connect(serverSocketAddress);
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/069de27d/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConfig.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConfig.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConfig.java
index c178f2e..7b0da43 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConfig.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConfig.java
@@ -18,10 +18,14 @@
 
 package org.apache.flink.runtime.io.network.netty;
 
+import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.net.SSLUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLParameters;
 import java.net.InetAddress;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
@@ -204,11 +208,44 @@ public class NettyConfig {
                }
        }
 
+       public SSLContext createClientSSLContext() throws Exception {
+
+               // Create SSL Context from config
+               SSLContext clientSSLContext = null;
+               if (getSSLEnabled()) {
+                       clientSSLContext = 
SSLUtils.createSSLClientContext(config);
+               }
+
+               return clientSSLContext;
+       }
+
+       public SSLContext createServerSSLContext() throws Exception {
+
+               // Create SSL Context from config
+               SSLContext serverSSLContext = null;
+               if (getSSLEnabled()) {
+                       serverSSLContext = 
SSLUtils.createSSLServerContext(config);
+               }
+
+               return serverSSLContext;
+       }
+
+       public boolean getSSLEnabled() {
+               return 
config.getBoolean(ConfigConstants.TASK_MANAGER_DATA_SSL_ENABLED,
+                               
ConfigConstants.DEFAULT_TASK_MANAGER_DATA_SSL_ENABLED)
+                       && SSLUtils.getSSLEnabled(config);
+       }
+
+       public void setSSLVerifyHostname(SSLParameters sslParams) {
+               SSLUtils.setSSLVerifyHostname(config, sslParams);
+       }
+
        @Override
        public String toString() {
                String format = "NettyConfig [" +
                                "server address: %s, " +
                                "server port: %d, " +
+                               "ssl enabled: %s, " +
                                "memory segment size (bytes): %d, " +
                                "transport type: %s, " +
                                "number of server threads: %d (%s), " +
@@ -220,8 +257,9 @@ public class NettyConfig {
                String def = "use Netty's default";
                String man = "manual";
 
-               return String.format(format, serverAddress, serverPort, 
memorySegmentSize,
-                               getTransportType(), getServerNumThreads(), 
getServerNumThreads() == 0 ? def : man,
+               return String.format(format, serverAddress, serverPort, 
getSSLEnabled() ? "true":"false",
+                               memorySegmentSize, getTransportType(), 
getServerNumThreads(),
+                               getServerNumThreads() == 0 ? def : man,
                                getClientNumThreads(), getClientNumThreads() == 
0 ? def : man,
                                getServerConnectBacklog(), 
getServerConnectBacklog() == 0 ? def : man,
                                getClientConnectTimeoutSeconds(), 
getSendAndReceiveBufferSize(),

http://git-wip-us.apache.org/repos/asf/flink/blob/069de27d/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyServer.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyServer.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyServer.java
index a93e90c..490b53e 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyServer.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyServer.java
@@ -29,9 +29,12 @@ import io.netty.channel.epoll.EpollServerSocketChannel;
 import io.netty.channel.nio.NioEventLoopGroup;
 import io.netty.channel.socket.SocketChannel;
 import io.netty.channel.socket.nio.NioServerSocketChannel;
+import io.netty.handler.ssl.SslHandler;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLEngine;
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.util.concurrent.ThreadFactory;
@@ -51,6 +54,8 @@ class NettyServer {
 
        private ChannelFuture bindFuture;
 
+       private SSLContext serverSSLContext = null;
+
        private InetSocketAddress localAddress;
 
        NettyServer(NettyConfig config) {
@@ -115,6 +120,13 @@ class NettyServer {
                
bootstrap.childOption(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, 
config.getMemorySegmentSize() + 1);
                
bootstrap.childOption(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, 2 * 
config.getMemorySegmentSize());
 
+               // SSL related configuration
+               try {
+                       serverSSLContext = config.createServerSSLContext();
+               } catch (Exception e) {
+                       throw new IOException("Failed to initialize SSL Context 
for the Netty Server", e);
+               }
+
                // 
--------------------------------------------------------------------
                // Child channel pipeline for accepted connections
                // 
--------------------------------------------------------------------
@@ -122,6 +134,12 @@ class NettyServer {
                bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        public void initChannel(SocketChannel channel) throws 
Exception {
+                               if (serverSSLContext != null) {
+                                       SSLEngine sslEngine = 
serverSSLContext.createSSLEngine();
+                                       sslEngine.setUseClientMode(false);
+                                       channel.pipeline().addLast("ssl", new 
SslHandler(sslEngine));
+                               }
+
                                
channel.pipeline().addLast(protocol.getServerChannelHandlers());
                        }
                });

http://git-wip-us.apache.org/repos/asf/flink/blob/069de27d/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
index 942f1e5..1c6e361 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
@@ -496,17 +496,20 @@ public class JobGraph implements Serializable {
         *
         * @param serverAddress
         *        the network address of the BLOB server
+        * @param blobClientConfig
+        *        the blob client configuration
         * @throws IOException
         *         thrown if an I/O error occurs during the upload
         */
-       public void uploadRequiredJarFiles(InetSocketAddress serverAddress) 
throws IOException {
+       public void uploadRequiredJarFiles(InetSocketAddress serverAddress,
+                       Configuration blobClientConfig) throws IOException {
                if (this.userJars.isEmpty()) {
                        return;
                }
 
                BlobClient bc = null;
                try {
-                       bc = new BlobClient(serverAddress);
+                       bc = new BlobClient(serverAddress, blobClientConfig);
 
                        for (final Path jar : this.userJars) {
 
@@ -550,10 +553,12 @@ public class JobGraph implements Serializable {
         *
         * @param jobManager JobManager actor gateway
         * @param askTimeout Ask timeout
+        * @param blobClientConfig the blob client configuration
         * @throws IOException Thrown, if the file upload to the JobManager 
failed.
         */
-       public void uploadUserJars(ActorGateway jobManager, FiniteDuration 
askTimeout) throws IOException {
-               List<BlobKey> blobKeys = BlobClient.uploadJarFiles(jobManager, 
askTimeout, userJars);
+       public void uploadUserJars(ActorGateway jobManager, FiniteDuration 
askTimeout,
+                       Configuration blobClientConfig) throws IOException {
+               List<BlobKey> blobKeys = BlobClient.uploadJarFiles(jobManager, 
askTimeout, blobClientConfig, userJars);
 
                for (BlobKey blobKey : blobKeys) {
                        if (!userJarBlobKeys.contains(blobKey)) {

Reply via email to