http://git-wip-us.apache.org/repos/asf/nifi/blob/c120c498/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/http/HttpProxy.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/http/HttpProxy.java b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/http/HttpProxy.java new file mode 100644 index 0000000..4c0ebe7 --- /dev/null +++ b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/http/HttpProxy.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.remote.protocol.http; + +import org.apache.commons.lang3.StringUtils; +import org.apache.http.HttpHost; + +public class HttpProxy { + private final String host; + private final Integer port; + private final String username; + private final String password; + + public HttpProxy(final String host, final Integer port, final String username, final String password) { + this.host = host; + this.port = port; + this.username = username; + this.password = password; + } + + + public String getHost() { + return host; + } + + public Integer getPort() { + return port; + } + + public String getUsername() { + return username; + } + + public String getPassword() { + return password; + } + + public HttpHost getHttpHost() { + if (StringUtils.isEmpty(host)) { + return null; + } + return new HttpHost(host, port == null ? 80 : port); + } + +}
http://git-wip-us.apache.org/repos/asf/nifi/blob/c120c498/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/HandshakeProperty.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/HandshakeProperty.java b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/HandshakeProperty.java deleted file mode 100644 index 016690c..0000000 --- a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/HandshakeProperty.java +++ /dev/null @@ -1,59 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.remote.protocol.socket; - -/** - * Enumeration of Properties that can be used for the Site-to-Site Socket - * Protocol. - */ -public enum HandshakeProperty { - - /** - * Boolean value indicating whether or not the contents of a FlowFile should - * be GZipped when transferred. - */ - GZIP, - /** - * The unique identifier of the port to communicate with - */ - PORT_IDENTIFIER, - /** - * Indicates the number of milliseconds after the request was made that the - * client will wait for a response. If no response has been received by the - * time this value expires, the server can move on without attempting to - * service the request because the client will have already disconnected. - */ - REQUEST_EXPIRATION_MILLIS, - /** - * The preferred number of FlowFiles that the server should send to the - * client when pulling data. This property was introduced in version 5 of - * the protocol. - */ - BATCH_COUNT, - /** - * The preferred number of bytes that the server should send to the client - * when pulling data. This property was introduced in version 5 of the - * protocol. - */ - BATCH_SIZE, - /** - * The preferred amount of time that the server should send data to the - * client when pulling data. This property was introduced in version 5 of - * the protocol. Value is in milliseconds. - */ - BATCH_DURATION; -} http://git-wip-us.apache.org/repos/asf/nifi/blob/c120c498/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/Response.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/Response.java b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/Response.java deleted file mode 100644 index 6ad2ba0..0000000 --- a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/Response.java +++ /dev/null @@ -1,52 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.remote.protocol.socket; - -import java.io.DataInputStream; -import java.io.IOException; - -import org.apache.nifi.remote.exception.ProtocolException; - -public class Response { - - private final ResponseCode code; - private final String message; - - private Response(final ResponseCode code, final String explanation) { - this.code = code; - this.message = explanation; - } - - public ResponseCode getCode() { - return code; - } - - public String getMessage() { - return message; - } - - public static Response read(final DataInputStream in) throws IOException, ProtocolException { - final ResponseCode code = ResponseCode.readCode(in); - final String message = code.containsMessage() ? in.readUTF() : null; - return new Response(code, message); - } - - @Override - public String toString() { - return code + ": " + message; - } -} http://git-wip-us.apache.org/repos/asf/nifi/blob/c120c498/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/ResponseCode.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/ResponseCode.java b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/ResponseCode.java deleted file mode 100644 index 0e1359e..0000000 --- a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/ResponseCode.java +++ /dev/null @@ -1,148 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.remote.protocol.socket; - -import java.io.DataOutputStream; -import java.io.EOFException; -import java.io.IOException; -import java.io.InputStream; - -import org.apache.nifi.remote.exception.ProtocolException; - -public enum ResponseCode { - - RESERVED(0, "Reserved for Future Use", false), // This will likely be used if we ever need to expand the length of - // ResponseCode, so that we can indicate a 0 followed by some other bytes - - // handshaking properties - PROPERTIES_OK(1, "Properties OK", false), - UNKNOWN_PROPERTY_NAME(230, "Unknown Property Name", true), - ILLEGAL_PROPERTY_VALUE(231, "Illegal Property Value", true), - MISSING_PROPERTY(232, "Missing Property", true), - // transaction indicators - CONTINUE_TRANSACTION(10, "Continue Transaction", false), - FINISH_TRANSACTION(11, "Finish Transaction", false), - CONFIRM_TRANSACTION(12, "Confirm Transaction", true), // "Explanation" of this code is the checksum - TRANSACTION_FINISHED(13, "Transaction Finished", false), - TRANSACTION_FINISHED_BUT_DESTINATION_FULL(14, "Transaction Finished But Destination is Full", false), - CANCEL_TRANSACTION(15, "Cancel Transaction", true), - BAD_CHECKSUM(19, "Bad Checksum", false), - // data availability indicators - MORE_DATA(20, "More Data Exists", false), - NO_MORE_DATA(21, "No More Data Exists", false), - // port state indicators - UNKNOWN_PORT(200, "Unknown Port", false), - PORT_NOT_IN_VALID_STATE(201, "Port Not in a Valid State", true), - PORTS_DESTINATION_FULL(202, "Port's Destination is Full", false), - // authorization - UNAUTHORIZED(240, "User Not Authorized", true), - // error indicators - ABORT(250, "Abort", true), - UNRECOGNIZED_RESPONSE_CODE(254, "Unrecognized Response Code", false), - END_OF_STREAM(255, "End of Stream", false); - - private static final ResponseCode[] codeArray = new ResponseCode[256]; - - static { - for (final ResponseCode responseCode : ResponseCode.values()) { - codeArray[responseCode.getCode()] = responseCode; - } - } - - private static final byte CODE_SEQUENCE_VALUE_1 = (byte) 'R'; - private static final byte CODE_SEQUENCE_VALUE_2 = (byte) 'C'; - private final int code; - private final byte[] codeSequence; - private final String description; - private final boolean containsMessage; - - private ResponseCode(final int code, final String description, final boolean containsMessage) { - this.codeSequence = new byte[]{CODE_SEQUENCE_VALUE_1, CODE_SEQUENCE_VALUE_2, (byte) code}; - this.code = code; - this.description = description; - this.containsMessage = containsMessage; - } - - public int getCode() { - return code; - } - - public byte[] getCodeSequence() { - return codeSequence; - } - - @Override - public String toString() { - return description; - } - - public boolean containsMessage() { - return containsMessage; - } - - public void writeResponse(final DataOutputStream out) throws IOException { - if (containsMessage()) { - throw new IllegalArgumentException("ResponseCode " + code + " expects an explanation"); - } - - out.write(getCodeSequence()); - out.flush(); - } - - public void writeResponse(final DataOutputStream out, final String explanation) throws IOException { - if (!containsMessage()) { - throw new IllegalArgumentException("ResponseCode " + code + " does not expect an explanation"); - } - - out.write(getCodeSequence()); - out.writeUTF(explanation); - out.flush(); - } - - static ResponseCode readCode(final InputStream in) throws IOException, ProtocolException { - final int byte1 = in.read(); - if (byte1 < 0) { - throw new EOFException(); - } else if (byte1 != CODE_SEQUENCE_VALUE_1) { - throw new ProtocolException("Expected to receive ResponseCode, but the stream did not have a ResponseCode"); - } - - final int byte2 = in.read(); - if (byte2 < 0) { - throw new EOFException(); - } else if (byte2 != CODE_SEQUENCE_VALUE_2) { - throw new ProtocolException("Expected to receive ResponseCode, but the stream did not have a ResponseCode"); - } - - final int byte3 = in.read(); - if (byte3 < 0) { - throw new EOFException(); - } - - final ResponseCode responseCode = codeArray[byte3]; - if (responseCode == null) { - throw new ProtocolException("Received Response Code of " + byte3 + " but do not recognize this code"); - } - return responseCode; - } - - public static ResponseCode fromSequence(final byte[] value) { - final int code = value[3] & 0xFF; - final ResponseCode responseCode = codeArray[code]; - return (responseCode == null) ? UNRECOGNIZED_RESPONSE_CODE : responseCode; - } -} http://git-wip-us.apache.org/repos/asf/nifi/blob/c120c498/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientProtocol.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientProtocol.java b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientProtocol.java index de845ee..25e78cb 100644 --- a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientProtocol.java +++ b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientProtocol.java @@ -16,24 +16,7 @@ */ package org.apache.nifi.remote.protocol.socket; -import java.io.DataInputStream; -import java.io.DataOutputStream; -import java.io.IOException; -import java.io.InputStream; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Map; -import java.util.Set; -import java.util.UUID; -import java.util.concurrent.TimeUnit; - import org.apache.nifi.events.EventReporter; -import org.apache.nifi.flowfile.FlowFile; -import org.apache.nifi.flowfile.attributes.CoreAttributes; -import org.apache.nifi.processor.ProcessContext; -import org.apache.nifi.processor.ProcessSession; -import org.apache.nifi.processor.Relationship; -import org.apache.nifi.processor.io.InputStreamCallback; import org.apache.nifi.remote.Peer; import org.apache.nifi.remote.PeerDescription; import org.apache.nifi.remote.PeerStatus; @@ -49,14 +32,23 @@ import org.apache.nifi.remote.exception.HandshakeException; import org.apache.nifi.remote.exception.ProtocolException; import org.apache.nifi.remote.protocol.ClientProtocol; import org.apache.nifi.remote.protocol.CommunicationsSession; -import org.apache.nifi.remote.protocol.DataPacket; +import org.apache.nifi.remote.protocol.HandshakeProperty; import org.apache.nifi.remote.protocol.RequestType; -import org.apache.nifi.remote.util.StandardDataPacket; -import org.apache.nifi.util.FormatUtils; -import org.apache.nifi.util.StopWatch; +import org.apache.nifi.remote.protocol.Response; +import org.apache.nifi.remote.protocol.ResponseCode; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.TimeUnit; + public class SocketClientProtocol implements ClientProtocol { private final VersionNegotiator versionNegotiator = new StandardVersionNegotiator(5, 4, 3, 2, 1); @@ -79,8 +71,6 @@ public class SocketClientProtocol implements ClientProtocol { private long batchMillis; private EventReporter eventReporter; - private static final long BATCH_SEND_NANOS = TimeUnit.SECONDS.toNanos(5L); // send batches of up to 5 seconds - public SocketClientProtocol() { } @@ -193,11 +183,6 @@ public class SocketClientProtocol implements ClientProtocol { } @Override - public boolean isReadyForFileTransfer() { - return readyForFileTransfer; - } - - @Override public boolean isPortInvalid() { if (!handshakeComplete) { throw new IllegalStateException("Handshake has not completed successfully"); @@ -286,138 +271,6 @@ public class SocketClientProtocol implements ClientProtocol { } @Override - public int receiveFlowFiles(final Peer peer, final ProcessContext context, final ProcessSession session, final FlowFileCodec codec) throws IOException, ProtocolException { - final String userDn = peer.getCommunicationsSession().getUserDn(); - final Transaction transaction = startTransaction(peer, codec, TransferDirection.RECEIVE); - - final StopWatch stopWatch = new StopWatch(true); - final Set<FlowFile> flowFilesReceived = new HashSet<>(); - long bytesReceived = 0L; - - while (true) { - final long start = System.nanoTime(); - final DataPacket dataPacket = transaction.receive(); - if (dataPacket == null) { - if (flowFilesReceived.isEmpty()) { - peer.penalize(destination.getIdentifier(), destination.getYieldPeriod(TimeUnit.MILLISECONDS)); - } - break; - } - - FlowFile flowFile = session.create(); - flowFile = session.putAllAttributes(flowFile, dataPacket.getAttributes()); - flowFile = session.importFrom(dataPacket.getData(), flowFile); - final long receiveNanos = System.nanoTime() - start; - - String sourceFlowFileIdentifier = dataPacket.getAttributes().get(CoreAttributes.UUID.key()); - if (sourceFlowFileIdentifier == null) { - sourceFlowFileIdentifier = "<Unknown Identifier>"; - } - - final String transitUri = (transitUriPrefix == null) ? peer.getUrl() : transitUriPrefix + sourceFlowFileIdentifier; - session.getProvenanceReporter().receive(flowFile, transitUri, "urn:nifi:" + sourceFlowFileIdentifier, "Remote Host=" - + peer.getHost() + ", Remote DN=" + userDn, TimeUnit.NANOSECONDS.toMillis(receiveNanos)); - - session.transfer(flowFile, Relationship.ANONYMOUS); - bytesReceived += dataPacket.getSize(); - } - - // Confirm that what we received was the correct data. - transaction.confirm(); - - // Commit the session so that we have persisted the data - session.commit(); - - transaction.complete(); - logger.debug("{} Sending TRANSACTION_FINISHED_BUT_DESTINATION_FULL to {}", this, peer); - - if (!flowFilesReceived.isEmpty()) { - stopWatch.stop(); - final String flowFileDescription = flowFilesReceived.size() < 20 ? flowFilesReceived.toString() : flowFilesReceived.size() + " FlowFiles"; - final String uploadDataRate = stopWatch.calculateDataRate(bytesReceived); - final long uploadMillis = stopWatch.getDuration(TimeUnit.MILLISECONDS); - final String dataSize = FormatUtils.formatDataSize(bytesReceived); - logger.info("{} Successfully receveied {} ({}) from {} in {} milliseconds at a rate of {}", new Object[]{ - this, flowFileDescription, dataSize, peer, uploadMillis, uploadDataRate}); - } - - return flowFilesReceived.size(); - } - - @Override - public int transferFlowFiles(final Peer peer, final ProcessContext context, final ProcessSession session, final FlowFileCodec codec) throws IOException, ProtocolException { - FlowFile flowFile = session.get(); - if (flowFile == null) { - return 0; - } - - try { - final String userDn = peer.getCommunicationsSession().getUserDn(); - final long startSendingNanos = System.nanoTime(); - final StopWatch stopWatch = new StopWatch(true); - long bytesSent = 0L; - - final Transaction transaction = startTransaction(peer, codec, TransferDirection.SEND); - - final Set<FlowFile> flowFilesSent = new HashSet<>(); - boolean continueTransaction = true; - while (continueTransaction) { - final long startNanos = System.nanoTime(); - // call codec.encode within a session callback so that we have the InputStream to read the FlowFile - final FlowFile toWrap = flowFile; - session.read(flowFile, new InputStreamCallback() { - @Override - public void process(final InputStream in) throws IOException { - final DataPacket dataPacket = new StandardDataPacket(toWrap.getAttributes(), in, toWrap.getSize()); - transaction.send(dataPacket); - } - }); - - final long transferNanos = System.nanoTime() - startNanos; - final long transferMillis = TimeUnit.MILLISECONDS.convert(transferNanos, TimeUnit.NANOSECONDS); - - flowFilesSent.add(flowFile); - bytesSent += flowFile.getSize(); - logger.debug("{} Sent {} to {}", this, flowFile, peer); - - final String transitUri = (transitUriPrefix == null) ? peer.getUrl() : transitUriPrefix + flowFile.getAttribute(CoreAttributes.UUID.key()); - session.getProvenanceReporter().send(flowFile, transitUri, "Remote Host=" + peer.getHost() + ", Remote DN=" + userDn, transferMillis, false); - session.remove(flowFile); - - final long sendingNanos = System.nanoTime() - startSendingNanos; - if (sendingNanos < BATCH_SEND_NANOS) { - flowFile = session.get(); - } else { - flowFile = null; - } - - continueTransaction = (flowFile != null); - } - - transaction.confirm(); - - // consume input stream entirely, ignoring its contents. If we - // don't do this, the Connection will not be returned to the pool - stopWatch.stop(); - final String uploadDataRate = stopWatch.calculateDataRate(bytesSent); - final long uploadMillis = stopWatch.getDuration(TimeUnit.MILLISECONDS); - final String dataSize = FormatUtils.formatDataSize(bytesSent); - - session.commit(); - transaction.complete(); - - final String flowFileDescription = (flowFilesSent.size() < 20) ? flowFilesSent.toString() : flowFilesSent.size() + " FlowFiles"; - logger.info("{} Successfully sent {} ({}) to {} in {} milliseconds at a rate of {}", new Object[]{ - this, flowFileDescription, dataSize, peer, uploadMillis, uploadDataRate}); - - return flowFilesSent.size(); - } catch (final Exception e) { - session.rollback(); - throw e; - } - } - - @Override public VersionNegotiator getVersionNegotiator() { return versionNegotiator; } http://git-wip-us.apache.org/repos/asf/nifi/blob/c120c498/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientTransaction.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientTransaction.java b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientTransaction.java index e83ea28..e29f045 100644 --- a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientTransaction.java +++ b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientTransaction.java @@ -16,69 +16,31 @@ */ package org.apache.nifi.remote.protocol.socket; -import java.io.ByteArrayInputStream; -import java.io.DataInputStream; -import java.io.DataOutputStream; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.util.Map; -import java.util.zip.CRC32; -import java.util.zip.CheckedInputStream; -import java.util.zip.CheckedOutputStream; - import org.apache.nifi.events.EventReporter; -import org.apache.nifi.remote.Communicant; +import org.apache.nifi.remote.AbstractTransaction; import org.apache.nifi.remote.Peer; -import org.apache.nifi.remote.Transaction; -import org.apache.nifi.remote.TransactionCompletion; import org.apache.nifi.remote.TransferDirection; import org.apache.nifi.remote.codec.FlowFileCodec; import org.apache.nifi.remote.exception.ProtocolException; -import org.apache.nifi.remote.io.CompressionInputStream; -import org.apache.nifi.remote.io.CompressionOutputStream; -import org.apache.nifi.remote.protocol.DataPacket; import org.apache.nifi.remote.protocol.RequestType; -import org.apache.nifi.remote.util.StandardDataPacket; -import org.apache.nifi.reporting.Severity; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import org.apache.nifi.remote.protocol.Response; +import org.apache.nifi.remote.protocol.ResponseCode; + +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; -public class SocketClientTransaction implements Transaction { +public class SocketClientTransaction extends AbstractTransaction { - private static final Logger logger = LoggerFactory.getLogger(SocketClientTransaction.class); - private final long creationNanoTime = System.nanoTime(); - private final CRC32 crc = new CRC32(); - private final int protocolVersion; - private final FlowFileCodec codec; private final DataInputStream dis; private final DataOutputStream dos; - private final TransferDirection direction; - private final boolean compress; - private final Peer peer; - private final int penaltyMillis; - private final String destinationId; - private final EventReporter eventReporter; - - private boolean dataAvailable = false; - private int transfers = 0; - private long contentBytes = 0; - private TransactionState state; SocketClientTransaction(final int protocolVersion, final String destinationId, final Peer peer, final FlowFileCodec codec, final TransferDirection direction, final boolean useCompression, final int penaltyMillis, final EventReporter eventReporter) throws IOException { - this.protocolVersion = protocolVersion; - this.destinationId = destinationId; - this.peer = peer; - this.codec = codec; - this.direction = direction; + super(peer, direction, useCompression, codec, eventReporter, protocolVersion, penaltyMillis, destinationId); this.dis = new DataInputStream(peer.getCommunicationsSession().getInput().getInputStream()); this.dos = new DataOutputStream(peer.getCommunicationsSession().getOutput().getOutputStream()); - this.compress = useCompression; - this.state = TransactionState.TRANSACTION_STARTED; - this.penaltyMillis = penaltyMillis; - this.eventReporter = eventReporter; initialize(); } @@ -116,291 +78,16 @@ public class SocketClientTransaction implements Transaction { } @Override - public DataPacket receive() throws IOException { - try { - try { - if (state != TransactionState.DATA_EXCHANGED && state != TransactionState.TRANSACTION_STARTED) { - throw new IllegalStateException("Cannot receive data from " + peer + " because Transaction State is " + state); - } - - if (direction == TransferDirection.SEND) { - throw new IllegalStateException("Attempting to receive data from " + peer + " but started a SEND Transaction"); - } - - // if we already know there's no data, just return null - if (!dataAvailable) { - return null; - } - - // if we have already received a packet, check if another is available. - if (transfers > 0) { - // Determine if Peer will send us data or has no data to send us - final Response dataAvailableCode = Response.read(dis); - switch (dataAvailableCode.getCode()) { - case CONTINUE_TRANSACTION: - logger.debug("{} {} Indicates Transaction should continue", this, peer); - this.dataAvailable = true; - break; - case FINISH_TRANSACTION: - logger.debug("{} {} Indicates Transaction should finish", this, peer); - this.dataAvailable = false; - break; - default: - throw new ProtocolException("Got unexpected response from " + peer + " when asking for data: " + dataAvailableCode); - } - } - - // if no data available, return null - if (!dataAvailable) { - return null; - } - - logger.debug("{} Receiving data from {}", this, peer); - final InputStream dataIn = compress ? new CompressionInputStream(dis) : dis; - final DataPacket packet = codec.decode(new CheckedInputStream(dataIn, crc)); - - if (packet == null) { - this.dataAvailable = false; - } else { - transfers++; - contentBytes += packet.getSize(); - } - - this.state = TransactionState.DATA_EXCHANGED; - return packet; - } catch (final IOException ioe) { - throw new IOException("Failed to receive data from " + peer + " due to " + ioe, ioe); - } - } catch (final Exception e) { - error(); - throw e; - } - } - - @Override - public void send(final byte[] content, final Map<String, String> attributes) throws IOException { - send(new StandardDataPacket(attributes, new ByteArrayInputStream(content), content.length)); + protected Response readTransactionResponse() throws IOException { + return Response.read(dis); } @Override - public void send(final DataPacket dataPacket) throws IOException { - try { - try { - if (state != TransactionState.DATA_EXCHANGED && state != TransactionState.TRANSACTION_STARTED) { - throw new IllegalStateException("Cannot send data to " + peer + " because Transaction State is " + state); - } - - if (direction == TransferDirection.RECEIVE) { - throw new IllegalStateException("Attempting to send data to " + peer + " but started a RECEIVE Transaction"); - } - - if (transfers > 0) { - ResponseCode.CONTINUE_TRANSACTION.writeResponse(dos); - } - - logger.debug("{} Sending data to {}", this, peer); - - final OutputStream dataOut = compress ? new CompressionOutputStream(dos) : dos; - final OutputStream out = new CheckedOutputStream(dataOut, crc); - codec.encode(dataPacket, out); - - // need to close the CompressionOutputStream in order to force it write out any remaining bytes. - // Otherwise, do NOT close it because we don't want to close the underlying stream - // (CompressionOutputStream will not close the underlying stream when it's closed) - if (compress) { - out.close(); - } - - transfers++; - contentBytes += dataPacket.getSize(); - this.state = TransactionState.DATA_EXCHANGED; - } catch (final IOException ioe) { - throw new IOException("Failed to send data to " + peer + " due to " + ioe, ioe); - } - } catch (final Exception e) { - error(); - throw e; - } - } - - @Override - public void cancel(final String explanation) throws IOException { - if (state == TransactionState.TRANSACTION_CANCELED || state == TransactionState.TRANSACTION_COMPLETED || state == TransactionState.ERROR) { - throw new IllegalStateException("Cannot cancel transaction because state is already " + state); - } - - try { - ResponseCode.CANCEL_TRANSACTION.writeResponse(dos, explanation == null ? "<No explanation given>" : explanation); - state = TransactionState.TRANSACTION_CANCELED; - } catch (final IOException ioe) { - error(); - throw new IOException("Failed to send 'cancel transaction' message to " + peer + " due to " + ioe, ioe); + protected void writeTransactionResponse(ResponseCode response, String explanation) throws IOException { + if(explanation == null){ + response.writeResponse(dos); + } else { + response.writeResponse(dos, explanation); } } - - @Override - public TransactionCompletion complete() throws IOException { - try { - try { - if (state != TransactionState.TRANSACTION_CONFIRMED) { - throw new IllegalStateException("Cannot complete transaction with " + peer + " because state is " + state - + "; Transaction can only be completed when state is " + TransactionState.TRANSACTION_CONFIRMED); - } - - boolean backoff = false; - if (direction == TransferDirection.RECEIVE) { - if (transfers == 0) { - state = TransactionState.TRANSACTION_COMPLETED; - return new SocketClientTransactionCompletion(false, 0, 0L, System.nanoTime() - creationNanoTime); - } - - // Confirm that we received the data and the peer can now discard it - logger.debug("{} Sending TRANSACTION_FINISHED to {}", this, peer); - ResponseCode.TRANSACTION_FINISHED.writeResponse(dos); - - state = TransactionState.TRANSACTION_COMPLETED; - } else { - final Response transactionResponse; - try { - transactionResponse = Response.read(dis); - } catch (final IOException e) { - throw new IOException(this + " Failed to receive a response from " + peer + " when expecting a TransactionFinished Indicator. " - + "It is unknown whether or not the peer successfully received/processed the data.", e); - } - - logger.debug("{} Received {} from {}", this, transactionResponse, peer); - if (transactionResponse.getCode() == ResponseCode.TRANSACTION_FINISHED_BUT_DESTINATION_FULL) { - peer.penalize(destinationId, penaltyMillis); - backoff = true; - } else if (transactionResponse.getCode() != ResponseCode.TRANSACTION_FINISHED) { - throw new ProtocolException("After sending data to " + peer + ", expected TRANSACTION_FINISHED response but got " + transactionResponse); - } - - state = TransactionState.TRANSACTION_COMPLETED; - } - - return new SocketClientTransactionCompletion(backoff, transfers, contentBytes, System.nanoTime() - creationNanoTime); - } catch (final IOException ioe) { - throw new IOException("Failed to complete transaction with " + peer + " due to " + ioe, ioe); - } - } catch (final Exception e) { - error(); - throw e; - } - } - - @Override - public void confirm() throws IOException { - try { - try { - if (state == TransactionState.TRANSACTION_STARTED && !dataAvailable && direction == TransferDirection.RECEIVE) { - // client requested to receive data but no data available. no need to confirm. - state = TransactionState.TRANSACTION_CONFIRMED; - return; - } - - if (state != TransactionState.DATA_EXCHANGED) { - throw new IllegalStateException("Cannot confirm Transaction because state is " + state - + "; Transaction can only be confirmed when state is " + TransactionState.DATA_EXCHANGED); - } - - if (direction == TransferDirection.RECEIVE) { - if (dataAvailable) { - throw new IllegalStateException("Cannot complete transaction because the sender has already sent more data than client has consumed."); - } - - // we received a FINISH_TRANSACTION indicator. Send back a CONFIRM_TRANSACTION message - // to peer so that we can verify that the connection is still open. This is a two-phase commit, - // which helps to prevent the chances of data duplication. Without doing this, we may commit the - // session and then when we send the response back to the peer, the peer may have timed out and may not - // be listening. As a result, it will re-send the data. By doing this two-phase commit, we narrow the - // Critical Section involved in this transaction so that rather than the Critical Section being the - // time window involved in the entire transaction, it is reduced to a simple round-trip conversation. - logger.trace("{} Sending CONFIRM_TRANSACTION Response Code to {}", this, peer); - final String calculatedCRC = String.valueOf(crc.getValue()); - ResponseCode.CONFIRM_TRANSACTION.writeResponse(dos, calculatedCRC); - - final Response confirmTransactionResponse; - try { - confirmTransactionResponse = Response.read(dis); - } catch (final IOException ioe) { - logger.error("Failed to receive response code from {} when expecting confirmation of transaction", peer); - if (eventReporter != null) { - eventReporter.reportEvent(Severity.ERROR, "Site-to-Site", "Failed to receive response code from " + peer + " when expecting confirmation of transaction"); - } - throw ioe; - } - - logger.trace("{} Received {} from {}", this, confirmTransactionResponse, peer); - - switch (confirmTransactionResponse.getCode()) { - case CONFIRM_TRANSACTION: - break; - case BAD_CHECKSUM: - throw new IOException(this + " Received a BadChecksum response from peer " + peer); - default: - throw new ProtocolException(this + " Received unexpected Response from peer " + peer + " : " - + confirmTransactionResponse + "; expected 'Confirm Transaction' Response Code"); - } - - state = TransactionState.TRANSACTION_CONFIRMED; - } else { - logger.debug("{} Sent FINISH_TRANSACTION indicator to {}", this, peer); - ResponseCode.FINISH_TRANSACTION.writeResponse(dos); - - final String calculatedCRC = String.valueOf(crc.getValue()); - - // we've sent a FINISH_TRANSACTION. Now we'll wait for the peer to send a 'Confirm Transaction' response - final Response transactionConfirmationResponse = Response.read(dis); - if (transactionConfirmationResponse.getCode() == ResponseCode.CONFIRM_TRANSACTION) { - // Confirm checksum and echo back the confirmation. - logger.trace("{} Received {} from {}", this, transactionConfirmationResponse, peer); - final String receivedCRC = transactionConfirmationResponse.getMessage(); - - // CRC was not used before version 4 - if (protocolVersion > 3) { - if (!receivedCRC.equals(calculatedCRC)) { - ResponseCode.BAD_CHECKSUM.writeResponse(dos); - throw new IOException(this + " Sent data to peer " + peer + " but calculated CRC32 Checksum as " - + calculatedCRC + " while peer calculated CRC32 Checksum as " - + receivedCRC + "; canceling transaction and rolling back session"); - } - } - - ResponseCode.CONFIRM_TRANSACTION.writeResponse(dos, ""); - } else { - throw new ProtocolException("Expected to receive 'Confirm Transaction' response from peer " - + peer + " but received " + transactionConfirmationResponse); - } - - state = TransactionState.TRANSACTION_CONFIRMED; - } - } catch (final IOException ioe) { - throw new IOException("Failed to confirm transaction with " + peer + " due to " + ioe, ioe); - } - } catch (final Exception e) { - error(); - throw e; - } - } - - @Override - public void error() { - this.state = TransactionState.ERROR; - } - - @Override - public TransactionState getState() { - return state; - } - - @Override - public Communicant getCommunicant() { - return peer; - } - - @Override - public String toString() { - return "SocketClientTransaction[Url=" + peer.getUrl() + ", TransferDirection=" + direction + ", State=" + state + "]"; - } } http://git-wip-us.apache.org/repos/asf/nifi/blob/c120c498/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientTransactionCompletion.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientTransactionCompletion.java b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientTransactionCompletion.java deleted file mode 100644 index 136fe8d..0000000 --- a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientTransactionCompletion.java +++ /dev/null @@ -1,57 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.remote.protocol.socket; - -import java.util.concurrent.TimeUnit; - -import org.apache.nifi.remote.TransactionCompletion; - -public class SocketClientTransactionCompletion implements TransactionCompletion { - - private final boolean backoff; - private final int dataPacketsTransferred; - private final long bytesTransferred; - private final long durationNanos; - - public SocketClientTransactionCompletion(final boolean backoff, final int dataPacketsTransferred, final long bytesTransferred, final long durationNanos) { - this.backoff = backoff; - this.dataPacketsTransferred = dataPacketsTransferred; - this.bytesTransferred = bytesTransferred; - this.durationNanos = durationNanos; - } - - @Override - public boolean isBackoff() { - return backoff; - } - - @Override - public int getDataPacketsTransferred() { - return dataPacketsTransferred; - } - - @Override - public long getBytesTransferred() { - return bytesTransferred; - } - - @Override - public long getDuration(final TimeUnit timeUnit) { - return timeUnit.convert(durationNanos, TimeUnit.NANOSECONDS); - } - -} http://git-wip-us.apache.org/repos/asf/nifi/blob/c120c498/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/EventReportUtil.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/EventReportUtil.java b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/EventReportUtil.java new file mode 100644 index 0000000..d6facb3 --- /dev/null +++ b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/EventReportUtil.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.remote.util; + +import org.apache.nifi.events.EventReporter; +import org.apache.nifi.reporting.Severity; +import org.slf4j.Logger; +import org.slf4j.helpers.MessageFormatter; + +public class EventReportUtil { + + private static final String CATEGORY = "Site-to-Site"; + + public static void warn(final Logger logger, final EventReporter eventReporter, final String msg, final Object... args) { + logger.warn(msg, args); + if (eventReporter != null) { + eventReporter.reportEvent(Severity.WARNING, CATEGORY, MessageFormatter.arrayFormat(msg, args).getMessage()); + } + } + + public static void warn(final Logger logger, final EventReporter eventReporter, final String msg, final Throwable t) { + logger.warn(msg, t); + + if (eventReporter != null) { + eventReporter.reportEvent(Severity.WARNING, CATEGORY, msg + ": " + t.toString()); + } + } + + public static void error(final Logger logger, final EventReporter eventReporter, final String msg, final Object... args) { + logger.error(msg, args); + if (eventReporter != null) { + eventReporter.reportEvent(Severity.ERROR, CATEGORY, MessageFormatter.arrayFormat(msg, args).getMessage()); + } + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/c120c498/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/NiFiRestApiUtil.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/NiFiRestApiUtil.java b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/NiFiRestApiUtil.java deleted file mode 100644 index 92d3408..0000000 --- a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/NiFiRestApiUtil.java +++ /dev/null @@ -1,100 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.remote.util; - -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.net.HttpURLConnection; -import java.net.URL; - -import javax.net.ssl.HostnameVerifier; -import javax.net.ssl.HttpsURLConnection; -import javax.net.ssl.SSLContext; -import javax.net.ssl.SSLSession; - -import org.apache.nifi.stream.io.StreamUtils; -import org.apache.nifi.web.api.dto.ControllerDTO; -import org.codehaus.jackson.JsonNode; -import org.codehaus.jackson.map.ObjectMapper; - -public class NiFiRestApiUtil { - - public static final int RESPONSE_CODE_OK = 200; - - private final SSLContext sslContext; - - public NiFiRestApiUtil(final SSLContext sslContext) { - this.sslContext = sslContext; - } - - private HttpURLConnection getConnection(final String connUrl, final int timeoutMillis) throws IOException { - final URL url = new URL(connUrl); - final HttpURLConnection connection = (HttpURLConnection) url.openConnection(); - connection.setConnectTimeout(timeoutMillis); - connection.setReadTimeout(timeoutMillis); - - // special handling for https - if (sslContext != null && connection instanceof HttpsURLConnection) { - HttpsURLConnection secureConnection = (HttpsURLConnection) connection; - secureConnection.setSSLSocketFactory(sslContext.getSocketFactory()); - - // check the trusted hostname property and override the HostnameVerifier - secureConnection.setHostnameVerifier(new OverrideHostnameVerifier(url.getHost(), - secureConnection.getHostnameVerifier())); - } - - return connection; - } - - public ControllerDTO getController(final String url, final int timeoutMillis) throws IOException { - final HttpURLConnection connection = getConnection(url, timeoutMillis); - connection.setRequestMethod("GET"); - final int responseCode = connection.getResponseCode(); - - final ByteArrayOutputStream baos = new ByteArrayOutputStream(); - StreamUtils.copy(connection.getInputStream(), baos); - final String responseMessage = baos.toString(); - - if (responseCode == RESPONSE_CODE_OK) { - final ObjectMapper mapper = new ObjectMapper(); - final JsonNode jsonNode = mapper.readTree(responseMessage); - final JsonNode controllerNode = jsonNode.get("controller"); - return mapper.readValue(controllerNode, ControllerDTO.class); - } else { - throw new IOException("Got HTTP response Code " + responseCode + ": " + connection.getResponseMessage() + " with explanation: " + responseMessage); - } - } - - private static class OverrideHostnameVerifier implements HostnameVerifier { - - private final String trustedHostname; - private final HostnameVerifier delegate; - - private OverrideHostnameVerifier(String trustedHostname, HostnameVerifier delegate) { - this.trustedHostname = trustedHostname; - this.delegate = delegate; - } - - @Override - public boolean verify(String hostname, SSLSession session) { - if (trustedHostname.equalsIgnoreCase(hostname)) { - return true; - } - return delegate.verify(hostname, session); - } - } -}