http://git-wip-us.apache.org/repos/asf/nifi/blob/c120c498/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/http/HttpProxy.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/http/HttpProxy.java
 
b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/http/HttpProxy.java
new file mode 100644
index 0000000..4c0ebe7
--- /dev/null
+++ 
b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/http/HttpProxy.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote.protocol.http;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.http.HttpHost;
+
+public class HttpProxy {
+    private final String host;
+    private final Integer port;
+    private final String username;
+    private final String password;
+
+    public HttpProxy(final String host, final Integer port, final String 
username, final String password) {
+        this.host = host;
+        this.port = port;
+        this.username = username;
+        this.password = password;
+    }
+
+
+    public String getHost() {
+        return host;
+    }
+
+    public Integer getPort() {
+        return port;
+    }
+
+    public String getUsername() {
+        return username;
+    }
+
+    public String getPassword() {
+        return password;
+    }
+
+    public HttpHost getHttpHost() {
+        if (StringUtils.isEmpty(host)) {
+            return null;
+        }
+        return new HttpHost(host, port == null ? 80 : port);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/c120c498/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/HandshakeProperty.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/HandshakeProperty.java
 
b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/HandshakeProperty.java
deleted file mode 100644
index 016690c..0000000
--- 
a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/HandshakeProperty.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.remote.protocol.socket;
-
-/**
- * Enumeration of Properties that can be used for the Site-to-Site Socket
- * Protocol.
- */
-public enum HandshakeProperty {
-
-    /**
-     * Boolean value indicating whether or not the contents of a FlowFile 
should
-     * be GZipped when transferred.
-     */
-    GZIP,
-    /**
-     * The unique identifier of the port to communicate with
-     */
-    PORT_IDENTIFIER,
-    /**
-     * Indicates the number of milliseconds after the request was made that the
-     * client will wait for a response. If no response has been received by the
-     * time this value expires, the server can move on without attempting to
-     * service the request because the client will have already disconnected.
-     */
-    REQUEST_EXPIRATION_MILLIS,
-    /**
-     * The preferred number of FlowFiles that the server should send to the
-     * client when pulling data. This property was introduced in version 5 of
-     * the protocol.
-     */
-    BATCH_COUNT,
-    /**
-     * The preferred number of bytes that the server should send to the client
-     * when pulling data. This property was introduced in version 5 of the
-     * protocol.
-     */
-    BATCH_SIZE,
-    /**
-     * The preferred amount of time that the server should send data to the
-     * client when pulling data. This property was introduced in version 5 of
-     * the protocol. Value is in milliseconds.
-     */
-    BATCH_DURATION;
-}

http://git-wip-us.apache.org/repos/asf/nifi/blob/c120c498/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/Response.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/Response.java
 
b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/Response.java
deleted file mode 100644
index 6ad2ba0..0000000
--- 
a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/Response.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.remote.protocol.socket;
-
-import java.io.DataInputStream;
-import java.io.IOException;
-
-import org.apache.nifi.remote.exception.ProtocolException;
-
-public class Response {
-
-    private final ResponseCode code;
-    private final String message;
-
-    private Response(final ResponseCode code, final String explanation) {
-        this.code = code;
-        this.message = explanation;
-    }
-
-    public ResponseCode getCode() {
-        return code;
-    }
-
-    public String getMessage() {
-        return message;
-    }
-
-    public static Response read(final DataInputStream in) throws IOException, 
ProtocolException {
-        final ResponseCode code = ResponseCode.readCode(in);
-        final String message = code.containsMessage() ? in.readUTF() : null;
-        return new Response(code, message);
-    }
-
-    @Override
-    public String toString() {
-        return code + ": " + message;
-    }
-}

http://git-wip-us.apache.org/repos/asf/nifi/blob/c120c498/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/ResponseCode.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/ResponseCode.java
 
b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/ResponseCode.java
deleted file mode 100644
index 0e1359e..0000000
--- 
a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/ResponseCode.java
+++ /dev/null
@@ -1,148 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.remote.protocol.socket;
-
-import java.io.DataOutputStream;
-import java.io.EOFException;
-import java.io.IOException;
-import java.io.InputStream;
-
-import org.apache.nifi.remote.exception.ProtocolException;
-
-public enum ResponseCode {
-
-    RESERVED(0, "Reserved for Future Use", false), // This will likely be used 
if we ever need to expand the length of
-    // ResponseCode, so that we can indicate a 0 followed by some other bytes
-
-    // handshaking properties
-    PROPERTIES_OK(1, "Properties OK", false),
-    UNKNOWN_PROPERTY_NAME(230, "Unknown Property Name", true),
-    ILLEGAL_PROPERTY_VALUE(231, "Illegal Property Value", true),
-    MISSING_PROPERTY(232, "Missing Property", true),
-    // transaction indicators
-    CONTINUE_TRANSACTION(10, "Continue Transaction", false),
-    FINISH_TRANSACTION(11, "Finish Transaction", false),
-    CONFIRM_TRANSACTION(12, "Confirm Transaction", true), // "Explanation" of 
this code is the checksum
-    TRANSACTION_FINISHED(13, "Transaction Finished", false),
-    TRANSACTION_FINISHED_BUT_DESTINATION_FULL(14, "Transaction Finished But 
Destination is Full", false),
-    CANCEL_TRANSACTION(15, "Cancel Transaction", true),
-    BAD_CHECKSUM(19, "Bad Checksum", false),
-    // data availability indicators
-    MORE_DATA(20, "More Data Exists", false),
-    NO_MORE_DATA(21, "No More Data Exists", false),
-    // port state indicators
-    UNKNOWN_PORT(200, "Unknown Port", false),
-    PORT_NOT_IN_VALID_STATE(201, "Port Not in a Valid State", true),
-    PORTS_DESTINATION_FULL(202, "Port's Destination is Full", false),
-    // authorization
-    UNAUTHORIZED(240, "User Not Authorized", true),
-    // error indicators
-    ABORT(250, "Abort", true),
-    UNRECOGNIZED_RESPONSE_CODE(254, "Unrecognized Response Code", false),
-    END_OF_STREAM(255, "End of Stream", false);
-
-    private static final ResponseCode[] codeArray = new ResponseCode[256];
-
-    static {
-        for (final ResponseCode responseCode : ResponseCode.values()) {
-            codeArray[responseCode.getCode()] = responseCode;
-        }
-    }
-
-    private static final byte CODE_SEQUENCE_VALUE_1 = (byte) 'R';
-    private static final byte CODE_SEQUENCE_VALUE_2 = (byte) 'C';
-    private final int code;
-    private final byte[] codeSequence;
-    private final String description;
-    private final boolean containsMessage;
-
-    private ResponseCode(final int code, final String description, final 
boolean containsMessage) {
-        this.codeSequence = new byte[]{CODE_SEQUENCE_VALUE_1, 
CODE_SEQUENCE_VALUE_2, (byte) code};
-        this.code = code;
-        this.description = description;
-        this.containsMessage = containsMessage;
-    }
-
-    public int getCode() {
-        return code;
-    }
-
-    public byte[] getCodeSequence() {
-        return codeSequence;
-    }
-
-    @Override
-    public String toString() {
-        return description;
-    }
-
-    public boolean containsMessage() {
-        return containsMessage;
-    }
-
-    public void writeResponse(final DataOutputStream out) throws IOException {
-        if (containsMessage()) {
-            throw new IllegalArgumentException("ResponseCode " + code + " 
expects an explanation");
-        }
-
-        out.write(getCodeSequence());
-        out.flush();
-    }
-
-    public void writeResponse(final DataOutputStream out, final String 
explanation) throws IOException {
-        if (!containsMessage()) {
-            throw new IllegalArgumentException("ResponseCode " + code + " does 
not expect an explanation");
-        }
-
-        out.write(getCodeSequence());
-        out.writeUTF(explanation);
-        out.flush();
-    }
-
-    static ResponseCode readCode(final InputStream in) throws IOException, 
ProtocolException {
-        final int byte1 = in.read();
-        if (byte1 < 0) {
-            throw new EOFException();
-        } else if (byte1 != CODE_SEQUENCE_VALUE_1) {
-            throw new ProtocolException("Expected to receive ResponseCode, but 
the stream did not have a ResponseCode");
-        }
-
-        final int byte2 = in.read();
-        if (byte2 < 0) {
-            throw new EOFException();
-        } else if (byte2 != CODE_SEQUENCE_VALUE_2) {
-            throw new ProtocolException("Expected to receive ResponseCode, but 
the stream did not have a ResponseCode");
-        }
-
-        final int byte3 = in.read();
-        if (byte3 < 0) {
-            throw new EOFException();
-        }
-
-        final ResponseCode responseCode = codeArray[byte3];
-        if (responseCode == null) {
-            throw new ProtocolException("Received Response Code of " + byte3 + 
" but do not recognize this code");
-        }
-        return responseCode;
-    }
-
-    public static ResponseCode fromSequence(final byte[] value) {
-        final int code = value[3] & 0xFF;
-        final ResponseCode responseCode = codeArray[code];
-        return (responseCode == null) ? UNRECOGNIZED_RESPONSE_CODE : 
responseCode;
-    }
-}

http://git-wip-us.apache.org/repos/asf/nifi/blob/c120c498/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientProtocol.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientProtocol.java
 
b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientProtocol.java
index de845ee..25e78cb 100644
--- 
a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientProtocol.java
+++ 
b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientProtocol.java
@@ -16,24 +16,7 @@
  */
 package org.apache.nifi.remote.protocol.socket;
 
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-import java.util.UUID;
-import java.util.concurrent.TimeUnit;
-
 import org.apache.nifi.events.EventReporter;
-import org.apache.nifi.flowfile.FlowFile;
-import org.apache.nifi.flowfile.attributes.CoreAttributes;
-import org.apache.nifi.processor.ProcessContext;
-import org.apache.nifi.processor.ProcessSession;
-import org.apache.nifi.processor.Relationship;
-import org.apache.nifi.processor.io.InputStreamCallback;
 import org.apache.nifi.remote.Peer;
 import org.apache.nifi.remote.PeerDescription;
 import org.apache.nifi.remote.PeerStatus;
@@ -49,14 +32,23 @@ import org.apache.nifi.remote.exception.HandshakeException;
 import org.apache.nifi.remote.exception.ProtocolException;
 import org.apache.nifi.remote.protocol.ClientProtocol;
 import org.apache.nifi.remote.protocol.CommunicationsSession;
-import org.apache.nifi.remote.protocol.DataPacket;
+import org.apache.nifi.remote.protocol.HandshakeProperty;
 import org.apache.nifi.remote.protocol.RequestType;
-import org.apache.nifi.remote.util.StandardDataPacket;
-import org.apache.nifi.util.FormatUtils;
-import org.apache.nifi.util.StopWatch;
+import org.apache.nifi.remote.protocol.Response;
+import org.apache.nifi.remote.protocol.ResponseCode;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
 public class SocketClientProtocol implements ClientProtocol {
 
     private final VersionNegotiator versionNegotiator = new 
StandardVersionNegotiator(5, 4, 3, 2, 1);
@@ -79,8 +71,6 @@ public class SocketClientProtocol implements ClientProtocol {
     private long batchMillis;
     private EventReporter eventReporter;
 
-    private static final long BATCH_SEND_NANOS = TimeUnit.SECONDS.toNanos(5L); 
// send batches of up to 5 seconds
-
     public SocketClientProtocol() {
     }
 
@@ -193,11 +183,6 @@ public class SocketClientProtocol implements 
ClientProtocol {
     }
 
     @Override
-    public boolean isReadyForFileTransfer() {
-        return readyForFileTransfer;
-    }
-
-    @Override
     public boolean isPortInvalid() {
         if (!handshakeComplete) {
             throw new IllegalStateException("Handshake has not completed 
successfully");
@@ -286,138 +271,6 @@ public class SocketClientProtocol implements 
ClientProtocol {
     }
 
     @Override
-    public int receiveFlowFiles(final Peer peer, final ProcessContext context, 
final ProcessSession session, final FlowFileCodec codec) throws IOException, 
ProtocolException {
-        final String userDn = peer.getCommunicationsSession().getUserDn();
-        final Transaction transaction = startTransaction(peer, codec, 
TransferDirection.RECEIVE);
-
-        final StopWatch stopWatch = new StopWatch(true);
-        final Set<FlowFile> flowFilesReceived = new HashSet<>();
-        long bytesReceived = 0L;
-
-        while (true) {
-            final long start = System.nanoTime();
-            final DataPacket dataPacket = transaction.receive();
-            if (dataPacket == null) {
-                if (flowFilesReceived.isEmpty()) {
-                    peer.penalize(destination.getIdentifier(), 
destination.getYieldPeriod(TimeUnit.MILLISECONDS));
-                }
-                break;
-            }
-
-            FlowFile flowFile = session.create();
-            flowFile = session.putAllAttributes(flowFile, 
dataPacket.getAttributes());
-            flowFile = session.importFrom(dataPacket.getData(), flowFile);
-            final long receiveNanos = System.nanoTime() - start;
-
-            String sourceFlowFileIdentifier = 
dataPacket.getAttributes().get(CoreAttributes.UUID.key());
-            if (sourceFlowFileIdentifier == null) {
-                sourceFlowFileIdentifier = "<Unknown Identifier>";
-            }
-
-            final String transitUri = (transitUriPrefix == null) ? 
peer.getUrl() : transitUriPrefix + sourceFlowFileIdentifier;
-            session.getProvenanceReporter().receive(flowFile, transitUri, 
"urn:nifi:" + sourceFlowFileIdentifier, "Remote Host="
-                    + peer.getHost() + ", Remote DN=" + userDn, 
TimeUnit.NANOSECONDS.toMillis(receiveNanos));
-
-            session.transfer(flowFile, Relationship.ANONYMOUS);
-            bytesReceived += dataPacket.getSize();
-        }
-
-        // Confirm that what we received was the correct data.
-        transaction.confirm();
-
-        // Commit the session so that we have persisted the data
-        session.commit();
-
-        transaction.complete();
-        logger.debug("{} Sending TRANSACTION_FINISHED_BUT_DESTINATION_FULL to 
{}", this, peer);
-
-        if (!flowFilesReceived.isEmpty()) {
-            stopWatch.stop();
-            final String flowFileDescription = flowFilesReceived.size() < 20 ? 
flowFilesReceived.toString() : flowFilesReceived.size() + " FlowFiles";
-            final String uploadDataRate = 
stopWatch.calculateDataRate(bytesReceived);
-            final long uploadMillis = 
stopWatch.getDuration(TimeUnit.MILLISECONDS);
-            final String dataSize = FormatUtils.formatDataSize(bytesReceived);
-            logger.info("{} Successfully receveied {} ({}) from {} in {} 
milliseconds at a rate of {}", new Object[]{
-                this, flowFileDescription, dataSize, peer, uploadMillis, 
uploadDataRate});
-        }
-
-        return flowFilesReceived.size();
-    }
-
-    @Override
-    public int transferFlowFiles(final Peer peer, final ProcessContext 
context, final ProcessSession session, final FlowFileCodec codec) throws 
IOException, ProtocolException {
-        FlowFile flowFile = session.get();
-        if (flowFile == null) {
-            return 0;
-        }
-
-        try {
-            final String userDn = peer.getCommunicationsSession().getUserDn();
-            final long startSendingNanos = System.nanoTime();
-            final StopWatch stopWatch = new StopWatch(true);
-            long bytesSent = 0L;
-
-            final Transaction transaction = startTransaction(peer, codec, 
TransferDirection.SEND);
-
-            final Set<FlowFile> flowFilesSent = new HashSet<>();
-            boolean continueTransaction = true;
-            while (continueTransaction) {
-                final long startNanos = System.nanoTime();
-                // call codec.encode within a session callback so that we have 
the InputStream to read the FlowFile
-                final FlowFile toWrap = flowFile;
-                session.read(flowFile, new InputStreamCallback() {
-                    @Override
-                    public void process(final InputStream in) throws 
IOException {
-                        final DataPacket dataPacket = new 
StandardDataPacket(toWrap.getAttributes(), in, toWrap.getSize());
-                        transaction.send(dataPacket);
-                    }
-                });
-
-                final long transferNanos = System.nanoTime() - startNanos;
-                final long transferMillis = 
TimeUnit.MILLISECONDS.convert(transferNanos, TimeUnit.NANOSECONDS);
-
-                flowFilesSent.add(flowFile);
-                bytesSent += flowFile.getSize();
-                logger.debug("{} Sent {} to {}", this, flowFile, peer);
-
-                final String transitUri = (transitUriPrefix == null) ? 
peer.getUrl() : transitUriPrefix + 
flowFile.getAttribute(CoreAttributes.UUID.key());
-                session.getProvenanceReporter().send(flowFile, transitUri, 
"Remote Host=" + peer.getHost() + ", Remote DN=" + userDn, transferMillis, 
false);
-                session.remove(flowFile);
-
-                final long sendingNanos = System.nanoTime() - 
startSendingNanos;
-                if (sendingNanos < BATCH_SEND_NANOS) {
-                    flowFile = session.get();
-                } else {
-                    flowFile = null;
-                }
-
-                continueTransaction = (flowFile != null);
-            }
-
-            transaction.confirm();
-
-            // consume input stream entirely, ignoring its contents. If we
-            // don't do this, the Connection will not be returned to the pool
-            stopWatch.stop();
-            final String uploadDataRate = 
stopWatch.calculateDataRate(bytesSent);
-            final long uploadMillis = 
stopWatch.getDuration(TimeUnit.MILLISECONDS);
-            final String dataSize = FormatUtils.formatDataSize(bytesSent);
-
-            session.commit();
-            transaction.complete();
-
-            final String flowFileDescription = (flowFilesSent.size() < 20) ? 
flowFilesSent.toString() : flowFilesSent.size() + " FlowFiles";
-            logger.info("{} Successfully sent {} ({}) to {} in {} milliseconds 
at a rate of {}", new Object[]{
-                this, flowFileDescription, dataSize, peer, uploadMillis, 
uploadDataRate});
-
-            return flowFilesSent.size();
-        } catch (final Exception e) {
-            session.rollback();
-            throw e;
-        }
-    }
-
-    @Override
     public VersionNegotiator getVersionNegotiator() {
         return versionNegotiator;
     }

http://git-wip-us.apache.org/repos/asf/nifi/blob/c120c498/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientTransaction.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientTransaction.java
 
b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientTransaction.java
index e83ea28..e29f045 100644
--- 
a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientTransaction.java
+++ 
b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientTransaction.java
@@ -16,69 +16,31 @@
  */
 package org.apache.nifi.remote.protocol.socket;
 
-import java.io.ByteArrayInputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.util.Map;
-import java.util.zip.CRC32;
-import java.util.zip.CheckedInputStream;
-import java.util.zip.CheckedOutputStream;
-
 import org.apache.nifi.events.EventReporter;
-import org.apache.nifi.remote.Communicant;
+import org.apache.nifi.remote.AbstractTransaction;
 import org.apache.nifi.remote.Peer;
-import org.apache.nifi.remote.Transaction;
-import org.apache.nifi.remote.TransactionCompletion;
 import org.apache.nifi.remote.TransferDirection;
 import org.apache.nifi.remote.codec.FlowFileCodec;
 import org.apache.nifi.remote.exception.ProtocolException;
-import org.apache.nifi.remote.io.CompressionInputStream;
-import org.apache.nifi.remote.io.CompressionOutputStream;
-import org.apache.nifi.remote.protocol.DataPacket;
 import org.apache.nifi.remote.protocol.RequestType;
-import org.apache.nifi.remote.util.StandardDataPacket;
-import org.apache.nifi.reporting.Severity;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.nifi.remote.protocol.Response;
+import org.apache.nifi.remote.protocol.ResponseCode;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
 
-public class SocketClientTransaction implements Transaction {
+public class SocketClientTransaction extends AbstractTransaction {
 
-    private static final Logger logger = 
LoggerFactory.getLogger(SocketClientTransaction.class);
 
-    private final long creationNanoTime = System.nanoTime();
-    private final CRC32 crc = new CRC32();
-    private final int protocolVersion;
-    private final FlowFileCodec codec;
     private final DataInputStream dis;
     private final DataOutputStream dos;
-    private final TransferDirection direction;
-    private final boolean compress;
-    private final Peer peer;
-    private final int penaltyMillis;
-    private final String destinationId;
-    private final EventReporter eventReporter;
-
-    private boolean dataAvailable = false;
-    private int transfers = 0;
-    private long contentBytes = 0;
-    private TransactionState state;
 
     SocketClientTransaction(final int protocolVersion, final String 
destinationId, final Peer peer, final FlowFileCodec codec,
             final TransferDirection direction, final boolean useCompression, 
final int penaltyMillis, final EventReporter eventReporter) throws IOException {
-        this.protocolVersion = protocolVersion;
-        this.destinationId = destinationId;
-        this.peer = peer;
-        this.codec = codec;
-        this.direction = direction;
+        super(peer, direction, useCompression, codec, eventReporter, 
protocolVersion, penaltyMillis, destinationId);
         this.dis = new 
DataInputStream(peer.getCommunicationsSession().getInput().getInputStream());
         this.dos = new 
DataOutputStream(peer.getCommunicationsSession().getOutput().getOutputStream());
-        this.compress = useCompression;
-        this.state = TransactionState.TRANSACTION_STARTED;
-        this.penaltyMillis = penaltyMillis;
-        this.eventReporter = eventReporter;
 
         initialize();
     }
@@ -116,291 +78,16 @@ public class SocketClientTransaction implements 
Transaction {
     }
 
     @Override
-    public DataPacket receive() throws IOException {
-        try {
-            try {
-                if (state != TransactionState.DATA_EXCHANGED && state != 
TransactionState.TRANSACTION_STARTED) {
-                    throw new IllegalStateException("Cannot receive data from 
" + peer + " because Transaction State is " + state);
-                }
-
-                if (direction == TransferDirection.SEND) {
-                    throw new IllegalStateException("Attempting to receive 
data from " + peer + " but started a SEND Transaction");
-                }
-
-                // if we already know there's no data, just return null
-                if (!dataAvailable) {
-                    return null;
-                }
-
-                // if we have already received a packet, check if another is 
available.
-                if (transfers > 0) {
-                    // Determine if Peer will send us data or has no data to 
send us
-                    final Response dataAvailableCode = Response.read(dis);
-                    switch (dataAvailableCode.getCode()) {
-                        case CONTINUE_TRANSACTION:
-                            logger.debug("{} {} Indicates Transaction should 
continue", this, peer);
-                            this.dataAvailable = true;
-                            break;
-                        case FINISH_TRANSACTION:
-                            logger.debug("{} {} Indicates Transaction should 
finish", this, peer);
-                            this.dataAvailable = false;
-                            break;
-                        default:
-                            throw new ProtocolException("Got unexpected 
response from " + peer + " when asking for data: " + dataAvailableCode);
-                    }
-                }
-
-                // if no data available, return null
-                if (!dataAvailable) {
-                    return null;
-                }
-
-                logger.debug("{} Receiving data from {}", this, peer);
-                final InputStream dataIn = compress ? new 
CompressionInputStream(dis) : dis;
-                final DataPacket packet = codec.decode(new 
CheckedInputStream(dataIn, crc));
-
-                if (packet == null) {
-                    this.dataAvailable = false;
-                } else {
-                    transfers++;
-                    contentBytes += packet.getSize();
-                }
-
-                this.state = TransactionState.DATA_EXCHANGED;
-                return packet;
-            } catch (final IOException ioe) {
-                throw new IOException("Failed to receive data from " + peer + 
" due to " + ioe, ioe);
-            }
-        } catch (final Exception e) {
-            error();
-            throw e;
-        }
-    }
-
-    @Override
-    public void send(final byte[] content, final Map<String, String> 
attributes) throws IOException {
-        send(new StandardDataPacket(attributes, new 
ByteArrayInputStream(content), content.length));
+    protected Response readTransactionResponse() throws IOException {
+        return Response.read(dis);
     }
 
     @Override
-    public void send(final DataPacket dataPacket) throws IOException {
-        try {
-            try {
-                if (state != TransactionState.DATA_EXCHANGED && state != 
TransactionState.TRANSACTION_STARTED) {
-                    throw new IllegalStateException("Cannot send data to " + 
peer + " because Transaction State is " + state);
-                }
-
-                if (direction == TransferDirection.RECEIVE) {
-                    throw new IllegalStateException("Attempting to send data 
to " + peer + " but started a RECEIVE Transaction");
-                }
-
-                if (transfers > 0) {
-                    ResponseCode.CONTINUE_TRANSACTION.writeResponse(dos);
-                }
-
-                logger.debug("{} Sending data to {}", this, peer);
-
-                final OutputStream dataOut = compress ? new 
CompressionOutputStream(dos) : dos;
-                final OutputStream out = new CheckedOutputStream(dataOut, crc);
-                codec.encode(dataPacket, out);
-
-                // need to close the CompressionOutputStream in order to force 
it write out any remaining bytes.
-                // Otherwise, do NOT close it because we don't want to close 
the underlying stream
-                // (CompressionOutputStream will not close the underlying 
stream when it's closed)
-                if (compress) {
-                    out.close();
-                }
-
-                transfers++;
-                contentBytes += dataPacket.getSize();
-                this.state = TransactionState.DATA_EXCHANGED;
-            } catch (final IOException ioe) {
-                throw new IOException("Failed to send data to " + peer + " due 
to " + ioe, ioe);
-            }
-        } catch (final Exception e) {
-            error();
-            throw e;
-        }
-    }
-
-    @Override
-    public void cancel(final String explanation) throws IOException {
-        if (state == TransactionState.TRANSACTION_CANCELED || state == 
TransactionState.TRANSACTION_COMPLETED || state == TransactionState.ERROR) {
-            throw new IllegalStateException("Cannot cancel transaction because 
state is already " + state);
-        }
-
-        try {
-            ResponseCode.CANCEL_TRANSACTION.writeResponse(dos, explanation == 
null ? "<No explanation given>" : explanation);
-            state = TransactionState.TRANSACTION_CANCELED;
-        } catch (final IOException ioe) {
-            error();
-            throw new IOException("Failed to send 'cancel transaction' message 
to " + peer + " due to " + ioe, ioe);
+    protected void writeTransactionResponse(ResponseCode response, String 
explanation) throws IOException {
+        if(explanation == null){
+            response.writeResponse(dos);
+        } else {
+            response.writeResponse(dos, explanation);
         }
     }
-
-    @Override
-    public TransactionCompletion complete() throws IOException {
-        try {
-            try {
-                if (state != TransactionState.TRANSACTION_CONFIRMED) {
-                    throw new IllegalStateException("Cannot complete 
transaction with " + peer + " because state is " + state
-                            + "; Transaction can only be completed when state 
is " + TransactionState.TRANSACTION_CONFIRMED);
-                }
-
-                boolean backoff = false;
-                if (direction == TransferDirection.RECEIVE) {
-                    if (transfers == 0) {
-                        state = TransactionState.TRANSACTION_COMPLETED;
-                        return new SocketClientTransactionCompletion(false, 0, 
0L, System.nanoTime() - creationNanoTime);
-                    }
-
-                    // Confirm that we received the data and the peer can now 
discard it
-                    logger.debug("{} Sending TRANSACTION_FINISHED to {}", 
this, peer);
-                    ResponseCode.TRANSACTION_FINISHED.writeResponse(dos);
-
-                    state = TransactionState.TRANSACTION_COMPLETED;
-                } else {
-                    final Response transactionResponse;
-                    try {
-                        transactionResponse = Response.read(dis);
-                    } catch (final IOException e) {
-                        throw new IOException(this + " Failed to receive a 
response from " + peer + " when expecting a TransactionFinished Indicator. "
-                                + "It is unknown whether or not the peer 
successfully received/processed the data.", e);
-                    }
-
-                    logger.debug("{} Received {} from {}", this, 
transactionResponse, peer);
-                    if (transactionResponse.getCode() == 
ResponseCode.TRANSACTION_FINISHED_BUT_DESTINATION_FULL) {
-                        peer.penalize(destinationId, penaltyMillis);
-                        backoff = true;
-                    } else if (transactionResponse.getCode() != 
ResponseCode.TRANSACTION_FINISHED) {
-                        throw new ProtocolException("After sending data to " + 
peer + ", expected TRANSACTION_FINISHED response but got " + 
transactionResponse);
-                    }
-
-                    state = TransactionState.TRANSACTION_COMPLETED;
-                }
-
-                return new SocketClientTransactionCompletion(backoff, 
transfers, contentBytes, System.nanoTime() - creationNanoTime);
-            } catch (final IOException ioe) {
-                throw new IOException("Failed to complete transaction with " + 
peer + " due to " + ioe, ioe);
-            }
-        } catch (final Exception e) {
-            error();
-            throw e;
-        }
-    }
-
-    @Override
-    public void confirm() throws IOException {
-        try {
-            try {
-                if (state == TransactionState.TRANSACTION_STARTED && 
!dataAvailable && direction == TransferDirection.RECEIVE) {
-                    // client requested to receive data but no data available. 
no need to confirm.
-                    state = TransactionState.TRANSACTION_CONFIRMED;
-                    return;
-                }
-
-                if (state != TransactionState.DATA_EXCHANGED) {
-                    throw new IllegalStateException("Cannot confirm 
Transaction because state is " + state
-                            + "; Transaction can only be confirmed when state 
is " + TransactionState.DATA_EXCHANGED);
-                }
-
-                if (direction == TransferDirection.RECEIVE) {
-                    if (dataAvailable) {
-                        throw new IllegalStateException("Cannot complete 
transaction because the sender has already sent more data than client has 
consumed.");
-                    }
-
-                    // we received a FINISH_TRANSACTION indicator. Send back a 
CONFIRM_TRANSACTION message
-                    // to peer so that we can verify that the connection is 
still open. This is a two-phase commit,
-                    // which helps to prevent the chances of data duplication. 
Without doing this, we may commit the
-                    // session and then when we send the response back to the 
peer, the peer may have timed out and may not
-                    // be listening. As a result, it will re-send the data. By 
doing this two-phase commit, we narrow the
-                    // Critical Section involved in this transaction so that 
rather than the Critical Section being the
-                    // time window involved in the entire transaction, it is 
reduced to a simple round-trip conversation.
-                    logger.trace("{} Sending CONFIRM_TRANSACTION Response Code 
to {}", this, peer);
-                    final String calculatedCRC = 
String.valueOf(crc.getValue());
-                    ResponseCode.CONFIRM_TRANSACTION.writeResponse(dos, 
calculatedCRC);
-
-                    final Response confirmTransactionResponse;
-                    try {
-                        confirmTransactionResponse = Response.read(dis);
-                    } catch (final IOException ioe) {
-                        logger.error("Failed to receive response code from {} 
when expecting confirmation of transaction", peer);
-                        if (eventReporter != null) {
-                            eventReporter.reportEvent(Severity.ERROR, 
"Site-to-Site", "Failed to receive response code from " + peer + " when 
expecting confirmation of transaction");
-                        }
-                        throw ioe;
-                    }
-
-                    logger.trace("{} Received {} from {}", this, 
confirmTransactionResponse, peer);
-
-                    switch (confirmTransactionResponse.getCode()) {
-                        case CONFIRM_TRANSACTION:
-                            break;
-                        case BAD_CHECKSUM:
-                            throw new IOException(this + " Received a 
BadChecksum response from peer " + peer);
-                        default:
-                            throw new ProtocolException(this + " Received 
unexpected Response from peer " + peer + " : "
-                                    + confirmTransactionResponse + "; expected 
'Confirm Transaction' Response Code");
-                    }
-
-                    state = TransactionState.TRANSACTION_CONFIRMED;
-                } else {
-                    logger.debug("{} Sent FINISH_TRANSACTION indicator to {}", 
this, peer);
-                    ResponseCode.FINISH_TRANSACTION.writeResponse(dos);
-
-                    final String calculatedCRC = 
String.valueOf(crc.getValue());
-
-                    // we've sent a FINISH_TRANSACTION. Now we'll wait for the 
peer to send a 'Confirm Transaction' response
-                    final Response transactionConfirmationResponse = 
Response.read(dis);
-                    if (transactionConfirmationResponse.getCode() == 
ResponseCode.CONFIRM_TRANSACTION) {
-                        // Confirm checksum and echo back the confirmation.
-                        logger.trace("{} Received {} from {}", this, 
transactionConfirmationResponse, peer);
-                        final String receivedCRC = 
transactionConfirmationResponse.getMessage();
-
-                        // CRC was not used before version 4
-                        if (protocolVersion > 3) {
-                            if (!receivedCRC.equals(calculatedCRC)) {
-                                ResponseCode.BAD_CHECKSUM.writeResponse(dos);
-                                throw new IOException(this + " Sent data to 
peer " + peer + " but calculated CRC32 Checksum as "
-                                        + calculatedCRC + " while peer 
calculated CRC32 Checksum as "
-                                        + receivedCRC + "; canceling 
transaction and rolling back session");
-                            }
-                        }
-
-                        ResponseCode.CONFIRM_TRANSACTION.writeResponse(dos, 
"");
-                    } else {
-                        throw new ProtocolException("Expected to receive 
'Confirm Transaction' response from peer "
-                                + peer + " but received " + 
transactionConfirmationResponse);
-                    }
-
-                    state = TransactionState.TRANSACTION_CONFIRMED;
-                }
-            } catch (final IOException ioe) {
-                throw new IOException("Failed to confirm transaction with " + 
peer + " due to " + ioe, ioe);
-            }
-        } catch (final Exception e) {
-            error();
-            throw e;
-        }
-    }
-
-    @Override
-    public void error() {
-        this.state = TransactionState.ERROR;
-    }
-
-    @Override
-    public TransactionState getState() {
-        return state;
-    }
-
-    @Override
-    public Communicant getCommunicant() {
-        return peer;
-    }
-
-    @Override
-    public String toString() {
-        return "SocketClientTransaction[Url=" + peer.getUrl() + ", 
TransferDirection=" + direction + ", State=" + state + "]";
-    }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/c120c498/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientTransactionCompletion.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientTransactionCompletion.java
 
b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientTransactionCompletion.java
deleted file mode 100644
index 136fe8d..0000000
--- 
a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientTransactionCompletion.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.remote.protocol.socket;
-
-import java.util.concurrent.TimeUnit;
-
-import org.apache.nifi.remote.TransactionCompletion;
-
-public class SocketClientTransactionCompletion implements 
TransactionCompletion {
-
-    private final boolean backoff;
-    private final int dataPacketsTransferred;
-    private final long bytesTransferred;
-    private final long durationNanos;
-
-    public SocketClientTransactionCompletion(final boolean backoff, final int 
dataPacketsTransferred, final long bytesTransferred, final long durationNanos) {
-        this.backoff = backoff;
-        this.dataPacketsTransferred = dataPacketsTransferred;
-        this.bytesTransferred = bytesTransferred;
-        this.durationNanos = durationNanos;
-    }
-
-    @Override
-    public boolean isBackoff() {
-        return backoff;
-    }
-
-    @Override
-    public int getDataPacketsTransferred() {
-        return dataPacketsTransferred;
-    }
-
-    @Override
-    public long getBytesTransferred() {
-        return bytesTransferred;
-    }
-
-    @Override
-    public long getDuration(final TimeUnit timeUnit) {
-        return timeUnit.convert(durationNanos, TimeUnit.NANOSECONDS);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/nifi/blob/c120c498/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/EventReportUtil.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/EventReportUtil.java
 
b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/EventReportUtil.java
new file mode 100644
index 0000000..d6facb3
--- /dev/null
+++ 
b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/EventReportUtil.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote.util;
+
+import org.apache.nifi.events.EventReporter;
+import org.apache.nifi.reporting.Severity;
+import org.slf4j.Logger;
+import org.slf4j.helpers.MessageFormatter;
+
+public class EventReportUtil {
+
+    private static final String CATEGORY = "Site-to-Site";
+
+    public static void warn(final Logger logger, final EventReporter 
eventReporter, final String msg, final Object... args) {
+        logger.warn(msg, args);
+        if (eventReporter != null) {
+            eventReporter.reportEvent(Severity.WARNING, CATEGORY, 
MessageFormatter.arrayFormat(msg, args).getMessage());
+        }
+    }
+
+    public static void warn(final Logger logger, final EventReporter 
eventReporter, final String msg, final Throwable t) {
+        logger.warn(msg, t);
+
+        if (eventReporter != null) {
+            eventReporter.reportEvent(Severity.WARNING, CATEGORY, msg + ": " + 
t.toString());
+        }
+    }
+
+    public static void error(final Logger logger, final EventReporter 
eventReporter, final String msg, final Object... args) {
+        logger.error(msg, args);
+        if (eventReporter != null) {
+            eventReporter.reportEvent(Severity.ERROR, CATEGORY, 
MessageFormatter.arrayFormat(msg, args).getMessage());
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/c120c498/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/NiFiRestApiUtil.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/NiFiRestApiUtil.java
 
b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/NiFiRestApiUtil.java
deleted file mode 100644
index 92d3408..0000000
--- 
a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/NiFiRestApiUtil.java
+++ /dev/null
@@ -1,100 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.remote.util;
-
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.net.HttpURLConnection;
-import java.net.URL;
-
-import javax.net.ssl.HostnameVerifier;
-import javax.net.ssl.HttpsURLConnection;
-import javax.net.ssl.SSLContext;
-import javax.net.ssl.SSLSession;
-
-import org.apache.nifi.stream.io.StreamUtils;
-import org.apache.nifi.web.api.dto.ControllerDTO;
-import org.codehaus.jackson.JsonNode;
-import org.codehaus.jackson.map.ObjectMapper;
-
-public class NiFiRestApiUtil {
-
-    public static final int RESPONSE_CODE_OK = 200;
-
-    private final SSLContext sslContext;
-
-    public NiFiRestApiUtil(final SSLContext sslContext) {
-        this.sslContext = sslContext;
-    }
-
-    private HttpURLConnection getConnection(final String connUrl, final int 
timeoutMillis) throws IOException {
-        final URL url = new URL(connUrl);
-        final HttpURLConnection connection = (HttpURLConnection) 
url.openConnection();
-        connection.setConnectTimeout(timeoutMillis);
-        connection.setReadTimeout(timeoutMillis);
-
-        // special handling for https
-        if (sslContext != null && connection instanceof HttpsURLConnection) {
-            HttpsURLConnection secureConnection = (HttpsURLConnection) 
connection;
-            
secureConnection.setSSLSocketFactory(sslContext.getSocketFactory());
-
-            // check the trusted hostname property and override the 
HostnameVerifier
-            secureConnection.setHostnameVerifier(new 
OverrideHostnameVerifier(url.getHost(),
-                    secureConnection.getHostnameVerifier()));
-        }
-
-        return connection;
-    }
-
-    public ControllerDTO getController(final String url, final int 
timeoutMillis) throws IOException {
-        final HttpURLConnection connection = getConnection(url, timeoutMillis);
-        connection.setRequestMethod("GET");
-        final int responseCode = connection.getResponseCode();
-
-        final ByteArrayOutputStream baos = new ByteArrayOutputStream();
-        StreamUtils.copy(connection.getInputStream(), baos);
-        final String responseMessage = baos.toString();
-
-        if (responseCode == RESPONSE_CODE_OK) {
-            final ObjectMapper mapper = new ObjectMapper();
-            final JsonNode jsonNode = mapper.readTree(responseMessage);
-            final JsonNode controllerNode = jsonNode.get("controller");
-            return mapper.readValue(controllerNode, ControllerDTO.class);
-        } else {
-            throw new IOException("Got HTTP response Code " + responseCode + 
": " + connection.getResponseMessage() + " with explanation: " + 
responseMessage);
-        }
-    }
-
-    private static class OverrideHostnameVerifier implements HostnameVerifier {
-
-        private final String trustedHostname;
-        private final HostnameVerifier delegate;
-
-        private OverrideHostnameVerifier(String trustedHostname, 
HostnameVerifier delegate) {
-            this.trustedHostname = trustedHostname;
-            this.delegate = delegate;
-        }
-
-        @Override
-        public boolean verify(String hostname, SSLSession session) {
-            if (trustedHostname.equalsIgnoreCase(hostname)) {
-                return true;
-            }
-            return delegate.verify(hostname, session);
-        }
-    }
-}

Reply via email to