http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b02af540/src/Site2SiteClientProtocol.cpp ---------------------------------------------------------------------- diff --git a/src/Site2SiteClientProtocol.cpp b/src/Site2SiteClientProtocol.cpp deleted file mode 100644 index 88ea78a..0000000 --- a/src/Site2SiteClientProtocol.cpp +++ /dev/null @@ -1,1313 +0,0 @@ -/** - * @file Site2SiteProtocol.cpp - * Site2SiteProtocol class implementation - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#include <sys/time.h> -#include <stdio.h> -#include <time.h> -#include <chrono> -#include <thread> -#include <random> -#include <netinet/tcp.h> -#include <iostream> -#include "Site2SitePeer.h" -#include "Site2SiteClientProtocol.h" - -bool Site2SiteClientProtocol::establish() -{ - if (_peerState != IDLE) - { - _logger->log_error("Site2Site peer state is not idle while try to establish"); - return false; - } - - bool ret = _peer->Open(); - - if (!ret) - { - _logger->log_error("Site2Site peer socket open failed"); - return false; - } - - // Negotiate the version - ret = initiateResourceNegotiation(); - - if (!ret) - { - _logger->log_error("Site2Site Protocol Version Negotiation failed"); - /* - _peer->yield(); - tearDown(); */ - return false; - } - - _logger->log_info("Site2Site socket established"); - _peerState = ESTABLISHED; - - return true; -} - -bool Site2SiteClientProtocol::initiateResourceNegotiation() -{ - // Negotiate the version - if (_peerState != IDLE) - { - _logger->log_error("Site2Site peer state is not idle while initiateResourceNegotiation"); - return false; - } - - _logger->log_info("Negotiate protocol version with destination port %s current version %d", _portIdStr.c_str(), _currentVersion); - - int ret = _peer->writeUTF(this->getResourceName()); - - if (ret <= 0) - { - // tearDown(); - return false; - } - - ret = _peer->write(_currentVersion); - - if (ret <= 0) - { - // tearDown(); - return false; - } - - uint8_t statusCode; - ret = _peer->read(statusCode); - - if (ret <= 0) - { - // tearDown(); - return false; - } - - switch (statusCode) - { - case RESOURCE_OK: - _logger->log_info("Site2Site Protocol Negotiate protocol version OK"); - return true; - case DIFFERENT_RESOURCE_VERSION: - uint32_t serverVersion; - ret = _peer->read(serverVersion); - if (ret <= 0) - { - // tearDown(); - return false; - } - _logger->log_info("Site2Site Server Response asked for a different protocol version %d", serverVersion); - for (unsigned int i = (_currentVersionIndex + 1); i < sizeof(_supportedVersion)/sizeof(uint32_t); i++) - { - if (serverVersion >= _supportedVersion[i]) - { - _currentVersion = _supportedVersion[i]; - _currentVersionIndex = i; - return initiateResourceNegotiation(); - } - } - ret = -1; - // tearDown(); - return false; - case NEGOTIATED_ABORT: - _logger->log_info("Site2Site Negotiate protocol response ABORT"); - ret = -1; - // tearDown(); - return false; - default: - _logger->log_info("Negotiate protocol response unknown code %d", statusCode); - return true; - } - - return true; -} - -bool Site2SiteClientProtocol::initiateCodecResourceNegotiation() -{ - // Negotiate the version - if (_peerState != HANDSHAKED) - { - _logger->log_error("Site2Site peer state is not handshaked while initiateCodecResourceNegotiation"); - return false; - } - - _logger->log_info("Negotiate Codec version with destination port %s current version %d", _portIdStr.c_str(), _currentCodecVersion); - - int ret = _peer->writeUTF(this->getCodecResourceName()); - - if (ret <= 0) - { - // tearDown(); - return false; - } - - ret = _peer->write(_currentCodecVersion); - - if (ret <= 0) - { - // tearDown(); - return false; - } - - uint8_t statusCode; - ret = _peer->read(statusCode); - - if (ret <= 0) - { - // tearDown(); - return false; - } - - switch (statusCode) - { - case RESOURCE_OK: - _logger->log_info("Site2Site Codec Negotiate version OK"); - return true; - case DIFFERENT_RESOURCE_VERSION: - uint32_t serverVersion; - ret = _peer->read(serverVersion); - if (ret <= 0) - { - // tearDown(); - return false; - } - _logger->log_info("Site2Site Server Response asked for a different codec version %d", serverVersion); - for (unsigned int i = (_currentCodecVersionIndex + 1); i < sizeof(_supportedCodecVersion)/sizeof(uint32_t); i++) - { - if (serverVersion >= _supportedCodecVersion[i]) - { - _currentCodecVersion = _supportedCodecVersion[i]; - _currentCodecVersionIndex = i; - return initiateCodecResourceNegotiation(); - } - } - ret = -1; - // tearDown(); - return false; - case NEGOTIATED_ABORT: - _logger->log_info("Site2Site Codec Negotiate response ABORT"); - ret = -1; - // tearDown(); - return false; - default: - _logger->log_info("Negotiate Codec response unknown code %d", statusCode); - return true; - } - - return true; -} - -bool Site2SiteClientProtocol::handShake() -{ - if (_peerState != ESTABLISHED) - { - _logger->log_error("Site2Site peer state is not established while handshake"); - return false; - } - _logger->log_info("Site2Site Protocol Perform hand shake with destination port %s", _portIdStr.c_str()); - uuid_t uuid; - // Generate the global UUID for the com identify - uuid_generate(uuid); - char uuidStr[37]; - uuid_unparse(uuid, uuidStr); - _commsIdentifier = uuidStr; - - int ret = _peer->writeUTF(_commsIdentifier); - - if (ret <= 0) - { - // tearDown(); - return false; - } - - std::map<std::string, std::string> properties; - properties[HandShakePropertyStr[GZIP]] = "false"; - properties[HandShakePropertyStr[PORT_IDENTIFIER]] = _portIdStr; - properties[HandShakePropertyStr[REQUEST_EXPIRATION_MILLIS]] = std::to_string(this->_timeOut); - if (this->_currentVersion >= 5) - { - if (this->_batchCount > 0) - properties[HandShakePropertyStr[BATCH_COUNT]] = std::to_string(this->_batchCount); - if (this->_batchSize > 0) - properties[HandShakePropertyStr[BATCH_SIZE]] = std::to_string(this->_batchSize); - if (this->_batchDuration > 0) - properties[HandShakePropertyStr[BATCH_DURATION]] = std::to_string(this->_batchDuration); - } - - if (_currentVersion >= 3) - { - ret = _peer->writeUTF(_peer->getURL()); - if (ret <= 0) - { - // tearDown(); - return false; - } - } - - uint32_t size = properties.size(); - ret = _peer->write(size); - if (ret <= 0) - { - // tearDown(); - return false; - } - - std::map<std::string, std::string>::iterator it; - for (it = properties.begin(); it!= properties.end(); it++) - { - ret = _peer->writeUTF(it->first); - if (ret <= 0) - { - // tearDown(); - return false; - } - ret = _peer->writeUTF(it->second); - if (ret <= 0) - { - // tearDown(); - return false; - } - _logger->log_info("Site2Site Protocol Send handshake properties %s %s", it->first.c_str(), it->second.c_str()); - } - - RespondCode code; - std::string message; - - ret = this->readRespond(code, message); - - if (ret <= 0) - { - // tearDown(); - return false; - } - - switch (code) - { - case PROPERTIES_OK: - _logger->log_info("Site2Site HandShake Completed"); - _peerState = HANDSHAKED; - return true; - case PORT_NOT_IN_VALID_STATE: - case UNKNOWN_PORT: - case PORTS_DESTINATION_FULL: - _logger->log_error("Site2Site HandShake Failed because destination port is either invalid or full"); - ret = -1; - /* - _peer->yield(); - tearDown(); */ - return false; - default: - _logger->log_info("HandShake Failed because of unknown respond code %d", code); - ret = -1; - /* - _peer->yield(); - tearDown(); */ - return false; - } - - return false; -} - -void Site2SiteClientProtocol::tearDown() -{ - if (_peerState >= ESTABLISHED) - { - _logger->log_info("Site2Site Protocol tearDown"); - // need to write shutdown request - writeRequestType(SHUTDOWN); - } - - std::map<std::string, Transaction *>::iterator it; - for (it = _transactionMap.begin(); it!= _transactionMap.end(); it++) - { - delete it->second; - } - _transactionMap.clear(); - _peer->Close(); - _peerState = IDLE; -} - -int Site2SiteClientProtocol::writeRequestType(RequestType type) -{ - if (type >= MAX_REQUEST_TYPE) - return -1; - - return _peer->writeUTF(RequestTypeStr[type]); -} - -int Site2SiteClientProtocol::readRequestType(RequestType &type) -{ - std::string requestTypeStr; - - int ret = _peer->readUTF(requestTypeStr); - - if (ret <= 0) - return ret; - - for (int i = (int) NEGOTIATE_FLOWFILE_CODEC; i <= (int) SHUTDOWN; i++) - { - if (RequestTypeStr[i] == requestTypeStr) - { - type = (RequestType) i; - return ret; - } - } - - return -1; -} - -int Site2SiteClientProtocol::readRespond(RespondCode &code, std::string &message) -{ - uint8_t firstByte; - - int ret = _peer->read(firstByte); - - if (ret <= 0 || firstByte != CODE_SEQUENCE_VALUE_1) - return -1; - - uint8_t secondByte; - - ret = _peer->read(secondByte); - - if (ret <= 0 || secondByte != CODE_SEQUENCE_VALUE_2) - return -1; - - uint8_t thirdByte; - - ret = _peer->read(thirdByte); - - if (ret <= 0) - return ret; - - code = (RespondCode) thirdByte; - - RespondCodeContext *resCode = this->getRespondCodeContext(code); - - if ( resCode == NULL) - { - // Not a valid respond code - return -1; - } - if (resCode->hasDescription) - { - ret = _peer->readUTF(message); - if (ret <= 0) - return -1; - } - return 3 + message.size(); -} - -int Site2SiteClientProtocol::writeRespond(RespondCode code, std::string message) -{ - RespondCodeContext *resCode = this->getRespondCodeContext(code); - - if (resCode == NULL) - { - // Not a valid respond code - return -1; - } - - uint8_t codeSeq[3]; - codeSeq[0] = CODE_SEQUENCE_VALUE_1; - codeSeq[1] = CODE_SEQUENCE_VALUE_2; - codeSeq[2] = (uint8_t) code; - - int ret = _peer->write(codeSeq, 3); - - if (ret != 3) - return -1; - - if (resCode->hasDescription) - { - ret = _peer->writeUTF(message); - if (ret > 0) - return (3 + ret); - else - return ret; - } - else - return 3; -} - -bool Site2SiteClientProtocol::negotiateCodec() -{ - if (_peerState != HANDSHAKED) - { - _logger->log_error("Site2Site peer state is not handshaked while negotiate codec"); - return false; - } - - _logger->log_info("Site2Site Protocol Negotiate Codec with destination port %s", _portIdStr.c_str()); - - int status = this->writeRequestType(NEGOTIATE_FLOWFILE_CODEC); - - if (status <= 0) - { - // tearDown(); - return false; - } - - // Negotiate the codec version - bool ret = initiateCodecResourceNegotiation(); - - if (!ret) - { - _logger->log_error("Site2Site Codec Version Negotiation failed"); - /* - _peer->yield(); - tearDown(); */ - return false; - } - - _logger->log_info("Site2Site Codec Completed and move to READY state for data transfer"); - _peerState = READY; - - return true; -} - -bool Site2SiteClientProtocol::bootstrap() -{ - if (_peerState == READY) - return true; - - tearDown(); - - if (establish() && handShake() && negotiateCodec()) - { - _logger->log_info("Site2Site Ready For data transaction"); - return true; - } - else - { - _peer->yield(); - tearDown(); - return false; - } -} - -Transaction* Site2SiteClientProtocol::createTransaction(std::string &transactionID, TransferDirection direction) -{ - int ret; - bool dataAvailable; - Transaction *transaction = NULL; - - if (_peerState != READY) - { - bootstrap(); - } - - if (_peerState != READY) - { - return NULL; - } - - if (direction == RECEIVE) - { - ret = writeRequestType(RECEIVE_FLOWFILES); - - if (ret <= 0) - { - // tearDown(); - return NULL; - } - - RespondCode code; - std::string message; - - ret = readRespond(code, message); - - if (ret <= 0) - { - // tearDown(); - return NULL; - } - - switch (code) - { - case MORE_DATA: - dataAvailable = true; - _logger->log_info("Site2Site peer indicates that data is available"); - transaction = new Transaction(direction); - _transactionMap[transaction->getUUIDStr()] = transaction; - transactionID = transaction->getUUIDStr(); - transaction->setDataAvailable(dataAvailable); - _logger->log_info("Site2Site create transaction %s", transaction->getUUIDStr().c_str()); - return transaction; - case NO_MORE_DATA: - dataAvailable = false; - _logger->log_info("Site2Site peer indicates that no data is available"); - transaction = new Transaction(direction); - _transactionMap[transaction->getUUIDStr()] = transaction; - transactionID = transaction->getUUIDStr(); - transaction->setDataAvailable(dataAvailable); - _logger->log_info("Site2Site create transaction %s", transaction->getUUIDStr().c_str()); - return transaction; - default: - _logger->log_info("Site2Site got unexpected response %d when asking for data", code); - // tearDown(); - return NULL; - } - } - else - { - ret = writeRequestType(SEND_FLOWFILES); - - if (ret <= 0) - { - // tearDown(); - return NULL; - } - else - { - transaction = new Transaction(direction); - _transactionMap[transaction->getUUIDStr()] = transaction; - transactionID = transaction->getUUIDStr(); - _logger->log_info("Site2Site create transaction %s", transaction->getUUIDStr().c_str()); - return transaction; - } - } -} - -bool Site2SiteClientProtocol::receive(std::string transactionID, DataPacket *packet, bool &eof) -{ - int ret; - Transaction *transaction = NULL; - - if (_peerState != READY) - { - bootstrap(); - } - - if (_peerState != READY) - { - return false; - } - - std::map<std::string, Transaction *>::iterator it = this->_transactionMap.find(transactionID); - - if (it == _transactionMap.end()) - { - return false; - } - else - { - transaction = it->second; - } - - if (transaction->getState() != TRANSACTION_STARTED && transaction->getState() != DATA_EXCHANGED) - { - _logger->log_info("Site2Site transaction %s is not at started or exchanged state", transactionID.c_str()); - return false; - } - - if (transaction->getDirection() != RECEIVE) - { - _logger->log_info("Site2Site transaction %s direction is wrong", transactionID.c_str()); - return false; - } - - if (!transaction->isDataAvailable()) - { - eof = true; - return true; - } - - if (transaction->_transfers > 0) - { - // if we already has transfer before, check to see whether another one is available - RespondCode code; - std::string message; - - ret = readRespond(code, message); - - if (ret <= 0) - { - return false; - } - if (code == CONTINUE_TRANSACTION) - { - _logger->log_info("Site2Site transaction %s peer indicate continue transaction", transactionID.c_str()); - transaction->_dataAvailable = true; - } - else if (code == FINISH_TRANSACTION) - { - _logger->log_info("Site2Site transaction %s peer indicate finish transaction", transactionID.c_str()); - transaction->_dataAvailable = false; - } - else - { - _logger->log_info("Site2Site transaction %s peer indicate wrong respond code %d", transactionID.c_str(), code); - return false; - } - } - - if (!transaction->isDataAvailable()) - { - eof = true; - return true; - } - - // start to read the packet - uint32_t numAttributes; - ret = _peer->read(numAttributes, &transaction->_crc); - if (ret <= 0 || numAttributes > MAX_NUM_ATTRIBUTES) - { - return false; - } - - // read the attributes - for (unsigned int i = 0; i < numAttributes; i++) - { - std::string key; - std::string value; - ret = _peer->readUTF(key, true, &transaction->_crc); - if (ret <= 0) - { - return false; - } - ret = _peer->readUTF(value, true, &transaction->_crc); - if (ret <= 0) - { - return false; - } - packet->_attributes[key] = value; - _logger->log_info("Site2Site transaction %s receives attribute key %s value %s", transactionID.c_str(), key.c_str(), value.c_str()); - } - - uint64_t len; - ret = _peer->read(len, &transaction->_crc); - if (ret <= 0) - { - return false; - } - - packet->_size = len; - transaction->_transfers++; - transaction->_state = DATA_EXCHANGED; - transaction->_bytes += len; - _logger->log_info("Site2Site transaction %s receives flow record %d, total length %d", transactionID.c_str(), - transaction->_transfers, transaction->_bytes); - - return true; -} - -bool Site2SiteClientProtocol::send(std::string transactionID, DataPacket *packet, FlowFileRecord *flowFile, ProcessSession *session) -{ - int ret; - Transaction *transaction = NULL; - - if (_peerState != READY) - { - bootstrap(); - } - - if (_peerState != READY) - { - return false; - } - - std::map<std::string, Transaction *>::iterator it = this->_transactionMap.find(transactionID); - - if (it == _transactionMap.end()) - { - return false; - } - else - { - transaction = it->second; - } - - if (transaction->getState() != TRANSACTION_STARTED && transaction->getState() != DATA_EXCHANGED) - { - _logger->log_info("Site2Site transaction %s is not at started or exchanged state", transactionID.c_str()); - return false; - } - - if (transaction->getDirection() != SEND) - { - _logger->log_info("Site2Site transaction %s direction is wrong", transactionID.c_str()); - return false; - } - - if (transaction->_transfers > 0) - { - ret = writeRespond(CONTINUE_TRANSACTION, "CONTINUE_TRANSACTION"); - if (ret <= 0) - { - return false; - } - } - - // start to read the packet - uint32_t numAttributes = packet->_attributes.size(); - ret = _peer->write(numAttributes, &transaction->_crc); - if (ret != 4) - { - return false; - } - - std::map<std::string, std::string>::iterator itAttribute; - for (itAttribute = packet->_attributes.begin(); itAttribute!= packet->_attributes.end(); itAttribute++) - { - ret = _peer->writeUTF(itAttribute->first, true, &transaction->_crc); - if (ret <= 0) - { - return false; - } - ret = _peer->writeUTF(itAttribute->second, true, &transaction->_crc); - if (ret <= 0) - { - return false; - } - _logger->log_info("Site2Site transaction %s send attribute key %s value %s", transactionID.c_str(), - itAttribute->first.c_str(), itAttribute->second.c_str()); - } - - uint64_t len = flowFile->getSize() ; - ret = _peer->write(len, &transaction->_crc); - if (ret != 8) - { - return false; - } - - if (flowFile->getSize()) - { - Site2SiteClientProtocol::ReadCallback callback(packet); - session->read(flowFile, &callback); - if (flowFile->getSize() != packet->_size) - { - return false; - } - } - - transaction->_transfers++; - transaction->_state = DATA_EXCHANGED; - transaction->_bytes += len; - _logger->log_info("Site2Site transaction %s send flow record %d, total length %d", transactionID.c_str(), - transaction->_transfers, transaction->_bytes); - - return true; -} - -void Site2SiteClientProtocol::receiveFlowFiles(ProcessContext *context, ProcessSession *session) -{ - uint64_t bytes = 0; - int transfers = 0; - Transaction *transaction = NULL; - - if (_peerState != READY) - { - bootstrap(); - } - - if (_peerState != READY) - { - context->yield(); - tearDown(); - throw Exception(SITE2SITE_EXCEPTION, "Can not establish handshake with peer"); - return; - } - - // Create the transaction - std::string transactionID; - transaction = createTransaction(transactionID, RECEIVE); - - if (transaction == NULL) - { - context->yield(); - tearDown(); - throw Exception(SITE2SITE_EXCEPTION, "Can not create transaction"); - return; - } - - try - { - while (true) - { - std::map<std::string, std::string> empty; - DataPacket packet(this, transaction, empty); - bool eof = false; - - if (!receive(transactionID, &packet, eof)) - { - throw Exception(SITE2SITE_EXCEPTION, "Receive Failed"); - return; - } - if (eof) - { - // transaction done - break; - } - FlowFileRecord *flowFile = session->create(); - if (!flowFile) - { - throw Exception(SITE2SITE_EXCEPTION, "Flow File Creation Failed"); - return; - } - std::map<std::string, std::string>::iterator it; - for (it = packet._attributes.begin(); it!= packet._attributes.end(); it++) - { - flowFile->addAttribute(it->first, it->second); - } - - if (packet._size > 0) - { - Site2SiteClientProtocol::WriteCallback callback(&packet); - session->write(flowFile, &callback); - if (flowFile->getSize() != packet._size) - { - throw Exception(SITE2SITE_EXCEPTION, "Receive Size Not Right"); - return; - } - } - Relationship relation; // undefined relationship - session->transfer(flowFile, relation); - // receive the transfer for the flow record - bytes += packet._size; - transfers++; - } // while true - - if (!confirm(transactionID)) - { - throw Exception(SITE2SITE_EXCEPTION, "Confirm Transaction Failed"); - return; - } - if (!complete(transactionID)) - { - throw Exception(SITE2SITE_EXCEPTION, "Complete Transaction Failed"); - return; - } - _logger->log_info("Site2Site transaction %s successfully receive flow record %d, content bytes %d", - transactionID.c_str(), transfers, bytes); - // we yield the receive if we did not get anything - if (transfers == 0) - context->yield(); - } - catch (std::exception &exception) - { - if (transaction) - deleteTransaction(transactionID); - context->yield(); - tearDown(); - _logger->log_debug("Caught Exception %s", exception.what()); - throw; - } - catch (...) - { - if (transaction) - deleteTransaction(transactionID); - context->yield(); - tearDown(); - _logger->log_debug("Caught Exception during Site2SiteClientProtocol::receiveFlowFiles"); - throw; - } - - deleteTransaction(transactionID); - - return; -} - -bool Site2SiteClientProtocol::confirm(std::string transactionID) -{ - int ret; - Transaction *transaction = NULL; - - if (_peerState != READY) - { - bootstrap(); - } - - if (_peerState != READY) - { - return false; - } - - std::map<std::string, Transaction *>::iterator it = this->_transactionMap.find(transactionID); - - if (it == _transactionMap.end()) - { - return false; - } - else - { - transaction = it->second; - } - - if (transaction->getState() == TRANSACTION_STARTED && !transaction->isDataAvailable() && - transaction->getDirection() == RECEIVE) - { - transaction->_state = TRANSACTION_CONFIRMED; - return true; - } - - if (transaction->getState() != DATA_EXCHANGED) - return false; - - if (transaction->getDirection() == RECEIVE) - { - if (transaction->isDataAvailable()) - return false; - // we received a FINISH_TRANSACTION indicator. Send back a CONFIRM_TRANSACTION message - // to peer so that we can verify that the connection is still open. This is a two-phase commit, - // which helps to prevent the chances of data duplication. Without doing this, we may commit the - // session and then when we send the response back to the peer, the peer may have timed out and may not - // be listening. As a result, it will re-send the data. By doing this two-phase commit, we narrow the - // Critical Section involved in this transaction so that rather than the Critical Section being the - // time window involved in the entire transaction, it is reduced to a simple round-trip conversation. - long crcValue = transaction->getCRC(); - std::string crc = std::to_string(crcValue); - _logger->log_info("Site2Site Send confirm with CRC %d to transaction %s", transaction->getCRC(), - transactionID.c_str()); - ret = writeRespond(CONFIRM_TRANSACTION, crc); - if (ret <= 0) - return false; - RespondCode code; - std::string message; - readRespond(code, message); - if (ret <= 0) - return false; - - if (code == CONFIRM_TRANSACTION) - { - _logger->log_info("Site2Site transaction %s peer confirm transaction", transactionID.c_str()); - transaction->_state = TRANSACTION_CONFIRMED; - return true; - } - else if (code == BAD_CHECKSUM) - { - _logger->log_info("Site2Site transaction %s peer indicate bad checksum", transactionID.c_str()); - /* - transaction->_state = TRANSACTION_CONFIRMED; - return true; */ - return false; - } - else - { - _logger->log_info("Site2Site transaction %s peer unknown respond code %d", - transactionID.c_str(), code); - return false; - } - } - else - { - _logger->log_info("Site2Site Send FINISH TRANSACTION for transaction %s", - transactionID.c_str()); - ret = writeRespond(FINISH_TRANSACTION, "FINISH_TRANSACTION"); - if (ret <= 0) - return false; - RespondCode code; - std::string message; - readRespond(code, message); - if (ret <= 0) - return false; - - // we've sent a FINISH_TRANSACTION. Now we'll wait for the peer to send a 'Confirm Transaction' response - if (code == CONFIRM_TRANSACTION) - { - _logger->log_info("Site2Site transaction %s peer confirm transaction with CRC %s", transactionID.c_str(), message.c_str()); - if (this->_currentVersion > 3) - { - long crcValue = transaction->getCRC(); - std::string crc = std::to_string(crcValue); - if (message == crc) - { - _logger->log_info("Site2Site transaction %s CRC matched", transactionID.c_str()); - ret = writeRespond(CONFIRM_TRANSACTION, "CONFIRM_TRANSACTION"); - if (ret <= 0) - return false; - transaction->_state = TRANSACTION_CONFIRMED; - return true; - } - else - { - _logger->log_info("Site2Site transaction %s CRC not matched %s", transactionID.c_str(), crc.c_str()); - ret = writeRespond(BAD_CHECKSUM, "BAD_CHECKSUM"); - /* - ret = writeRespond(CONFIRM_TRANSACTION, "CONFIRM_TRANSACTION"); - if (ret <= 0) - return false; - transaction->_state = TRANSACTION_CONFIRMED; - return true; */ - return false; - } - } - ret = writeRespond(CONFIRM_TRANSACTION, "CONFIRM_TRANSACTION"); - if (ret <= 0) - return false; - transaction->_state = TRANSACTION_CONFIRMED; - return true; - } - else - { - _logger->log_info("Site2Site transaction %s peer unknown respond code %d", - transactionID.c_str(), code); - return false; - } - return false; - } -} - -void Site2SiteClientProtocol::cancel(std::string transactionID) -{ - Transaction *transaction = NULL; - - if (_peerState != READY) - { - return; - } - - std::map<std::string, Transaction *>::iterator it = this->_transactionMap.find(transactionID); - - if (it == _transactionMap.end()) - { - return; - } - else - { - transaction = it->second; - } - - if (transaction->getState() == TRANSACTION_CANCELED || transaction->getState() == TRANSACTION_COMPLETED - || transaction->getState() == TRANSACTION_ERROR) - { - return; - } - - this->writeRespond(CANCEL_TRANSACTION, "Cancel"); - transaction->_state = TRANSACTION_CANCELED; - - tearDown(); - return; -} - -void Site2SiteClientProtocol::deleteTransaction(std::string transactionID) -{ - Transaction *transaction = NULL; - - std::map<std::string, Transaction *>::iterator it = this->_transactionMap.find(transactionID); - - if (it == _transactionMap.end()) - { - return; - } - else - { - transaction = it->second; - } - - _logger->log_info("Site2Site delete transaction %s", transaction->getUUIDStr().c_str()); - delete transaction; - _transactionMap.erase(transactionID); -} - -void Site2SiteClientProtocol::error(std::string transactionID) -{ - Transaction *transaction = NULL; - - std::map<std::string, Transaction *>::iterator it = this->_transactionMap.find(transactionID); - - if (it == _transactionMap.end()) - { - return; - } - else - { - transaction = it->second; - } - - transaction->_state = TRANSACTION_ERROR; - tearDown(); - return; -} - -//! Complete the transaction -bool Site2SiteClientProtocol::complete(std::string transactionID) -{ - int ret; - Transaction *transaction = NULL; - - if (_peerState != READY) - { - bootstrap(); - } - - if (_peerState != READY) - { - return false; - } - - std::map<std::string, Transaction *>::iterator it = this->_transactionMap.find(transactionID); - - if (it == _transactionMap.end()) - { - return false; - } - else - { - transaction = it->second; - } - - if (transaction->getState() != TRANSACTION_CONFIRMED) - { - return false; - } - - if (transaction->getDirection() == RECEIVE) - { - if (transaction->_transfers == 0) - { - transaction->_state = TRANSACTION_COMPLETED; - return true; - } - else - { - _logger->log_info("Site2Site transaction %s send finished", transactionID.c_str()); - ret = this->writeRespond(TRANSACTION_FINISHED, "Finished"); - if (ret <= 0) - return false; - else - { - transaction->_state = TRANSACTION_COMPLETED; - return true; - } - } - } - else - { - RespondCode code; - std::string message; - int ret; - - ret = readRespond(code, message); - - if (ret <= 0) - return false; - - if (code == TRANSACTION_FINISHED) - { - _logger->log_info("Site2Site transaction %s peer finished transaction", transactionID.c_str()); - transaction->_state = TRANSACTION_COMPLETED; - return true; - } - else - { - _logger->log_info("Site2Site transaction %s peer unknown respond code %d", - transactionID.c_str(), code); - return false; - } - } -} - -void Site2SiteClientProtocol::transferFlowFiles(ProcessContext *context, ProcessSession *session) -{ - FlowFileRecord *flow = session->get(); - Transaction *transaction = NULL; - - if (!flow) - return; - - if (_peerState != READY) - { - bootstrap(); - } - - if (_peerState != READY) - { - context->yield(); - tearDown(); - throw Exception(SITE2SITE_EXCEPTION, "Can not establish handshake with peer"); - return; - } - - // Create the transaction - std::string transactionID; - transaction = createTransaction(transactionID, SEND); - - if (transaction == NULL) - { - context->yield(); - tearDown(); - throw Exception(SITE2SITE_EXCEPTION, "Can not create transaction"); - return; - } - - bool continueTransaction = true; - uint64_t startSendingNanos = getTimeNano(); - - try - { - while (continueTransaction) - { - DataPacket packet(this, transaction, flow->getAttributes()); - - if (!send(transactionID, &packet, flow, session)) - { - throw Exception(SITE2SITE_EXCEPTION, "Send Failed"); - return; - } - _logger->log_info("Site2Site transaction %s send flow record %s", - transactionID.c_str(), flow->getUUIDStr().c_str()); - session->remove(flow); - - uint64_t transferNanos = getTimeNano() - startSendingNanos; - if (transferNanos > _batchSendNanos) - break; - - flow = session->get(); - if (!flow) - { - continueTransaction = false; - } - } // while true - - if (!confirm(transactionID)) - { - throw Exception(SITE2SITE_EXCEPTION, "Confirm Failed"); - return; - } - if (!complete(transactionID)) - { - throw Exception(SITE2SITE_EXCEPTION, "Complete Failed"); - return; - } - _logger->log_info("Site2Site transaction %s successfully send flow record %d, content bytes %d", - transactionID.c_str(), transaction->_transfers, transaction->_bytes); - } - catch (std::exception &exception) - { - if (transaction) - deleteTransaction(transactionID); - context->yield(); - tearDown(); - _logger->log_debug("Caught Exception %s", exception.what()); - throw; - } - catch (...) - { - if (transaction) - deleteTransaction(transactionID); - context->yield(); - tearDown(); - _logger->log_debug("Caught Exception during Site2SiteClientProtocol::transferFlowFiles"); - throw; - } - - deleteTransaction(transactionID); - - return; -}
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b02af540/src/Site2SitePeer.cpp ---------------------------------------------------------------------- diff --git a/src/Site2SitePeer.cpp b/src/Site2SitePeer.cpp deleted file mode 100644 index 48e19d0..0000000 --- a/src/Site2SitePeer.cpp +++ /dev/null @@ -1,435 +0,0 @@ -/** - * @file Site2SitePeer.cpp - * Site2SitePeer class implementation - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#include <sys/time.h> -#include <stdio.h> -#include <time.h> -#include <chrono> -#include <thread> -#include <random> -#include <netinet/tcp.h> -#include <iostream> -#include "Site2SitePeer.h" - -//! CRC tables -std::atomic<bool> CRC32::tableInit(false); -unsigned int CRC32::table[256]; - -bool Site2SitePeer::Open() -{ - in_addr_t addr; - int sock = 0; - struct hostent *h; - const char *host; - uint16_t port; - - host = this->_host.c_str(); - port = this->_port; - - if (strlen(host) == 0) - return false; - -#ifdef __MACH__ - h = gethostbyname(host); -#else - char buf[1024]; - struct hostent he; - int hh_errno; - gethostbyname_r(host, &he, buf, sizeof(buf), &h, &hh_errno); -#endif - memcpy((char *) &addr, h->h_addr_list[0], h->h_length); - sock = socket(AF_INET, SOCK_STREAM, 0); - if (sock < 0) - { - _logger->log_error("Could not create socket to hostName %s", host); - this->yield(); - return false; - } - -#ifndef __MACH__ - int opt = 1; - bool nagle_off = true; - - if (nagle_off) - { - if (setsockopt(sock, SOL_TCP, TCP_NODELAY, (void *)&opt, sizeof(opt)) < 0) - { - _logger->log_error("setsockopt() TCP_NODELAY failed"); - close(sock); - this->yield(); - return false; - } - if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, - (char *)&opt, sizeof(opt)) < 0) - { - _logger->log_error("setsockopt() SO_REUSEADDR failed"); - close(sock); - this->yield(); - return false; - } - } - - int sndsize = 256*1024; - if (setsockopt(sock, SOL_SOCKET, SO_SNDBUF, (char *)&sndsize, (int)sizeof(sndsize)) < 0) - { - _logger->log_error("setsockopt() SO_SNDBUF failed"); - close(sock); - this->yield(); - return false; - } - int rcvsize = 256*1024; - if (setsockopt(sock, SOL_SOCKET, SO_RCVBUF, (char *)&rcvsize, (int)sizeof(rcvsize)) < 0) - { - _logger->log_error("setsockopt() SO_RCVBUF failed"); - close(sock); - this->yield(); - return false; - } -#endif - - struct sockaddr_in sa; - socklen_t socklen; - int status; - - memset(&sa, 0, sizeof(sa)); - sa.sin_family = AF_INET; - sa.sin_addr.s_addr = htonl(INADDR_ANY); - sa.sin_port = htons(0); - socklen = sizeof(sa); - if (bind(sock, (struct sockaddr *)&sa, socklen) < 0) - { - _logger->log_error("socket bind failed"); - close(sock); - this->yield(); - return false; - } - - memset(&sa, 0, sizeof(sa)); - sa.sin_family = AF_INET; - sa.sin_addr.s_addr = addr; - sa.sin_port = htons(port); - socklen = sizeof(sa); - - status = connect(sock, (struct sockaddr *)&sa, socklen); - - if (status < 0) - { - _logger->log_error("socket connect failed to %s %d", host, port); - close(sock); - this->yield(); - return false; - } - - _logger->log_info("Site2Site Peer socket %d connect to server %s port %d success", sock, host, port); - - _socket = sock; - - status = sendData((uint8_t *) MAGIC_BYTES, sizeof(MAGIC_BYTES)); - - if (status <= 0) - { - Close(); - return false; - } - - return true; -} - -void Site2SitePeer::Close() -{ - if (_socket) - { - _logger->log_info("Site2Site Peer socket %d close", _socket); - close(_socket); - _socket = 0; - } -} - -int Site2SitePeer::sendData(uint8_t *buf, int buflen, CRC32 *crc) -{ - int ret = 0, bytes = 0; - - if (_socket <= 0) - { - // this->yield(); - return -1; - } - - while (bytes < buflen) - { - ret = send(_socket, buf+bytes, buflen-bytes, 0); - //check for errors - if (ret == -1) - { - _logger->log_error("Site2Site Peer socket %d send failed %s", _socket, strerror(errno)); - Close(); - // this->yield(); - return ret; - } - bytes+=ret; - } - - if (crc) - crc->update(buf, buflen); - - return bytes; -} - -int Site2SitePeer::Select(int msec) -{ - fd_set fds; - struct timeval tv; - int retval; - int fd = _socket; - - FD_ZERO(&fds); - FD_SET(fd, &fds); - - tv.tv_sec = msec/1000; - tv.tv_usec = (msec % 1000) * 1000; - - if (msec > 0) - retval = select(fd+1, &fds, NULL, NULL, &tv); - else - retval = select(fd+1, &fds, NULL, NULL, NULL); - - if (retval <= 0) - return retval; - if (FD_ISSET(fd, &fds)) - return retval; - else - return 0; -} - -int Site2SitePeer::readData(uint8_t *buf, int buflen, CRC32 *crc) -{ - int sendSize = buflen; - uint8_t *start = buf; - - if (_socket <= 0) - { - // this->yield(); - return -1; - } - - while (buflen) - { - int status; - status = Select((int) _timeOut); - if (status <= 0) - { - Close(); - return status; - } - status = recv(_socket, buf, buflen, 0); - if (status <= 0) - { - Close(); - // this->yield(); - return status; - } - buflen -= status; - buf += status; - } - - if (crc) - crc->update(start, sendSize); - - return sendSize; -} - -int Site2SitePeer::writeUTF(std::string str, bool widen, CRC32 *crc) -{ - int strlen = str.length(); - int utflen = 0; - int c, count = 0; - - /* use charAt instead of copying String to char array */ - for (int i = 0; i < strlen; i++) { - c = str.at(i); - if ((c >= 0x0001) && (c <= 0x007F)) { - utflen++; - } else if (c > 0x07FF) { - utflen += 3; - } else { - utflen += 2; - } - } - - if (utflen > 65535) - return -1; - - uint8_t *bytearr = NULL; - if (!widen) - { - bytearr = new uint8_t[utflen+2]; - bytearr[count++] = (uint8_t) ((utflen >> 8) & 0xFF); - bytearr[count++] = (uint8_t) ((utflen >> 0) & 0xFF); - } - else - { - bytearr = new uint8_t[utflen+4]; - bytearr[count++] = (uint8_t) ((utflen >> 24) & 0xFF); - bytearr[count++] = (uint8_t) ((utflen >> 16) & 0xFF); - bytearr[count++] = (uint8_t) ((utflen >> 8) & 0xFF); - bytearr[count++] = (uint8_t) ((utflen >> 0) & 0xFF); - } - - int i=0; - for (i=0; i<strlen; i++) { - c = str.at(i); - if (!((c >= 0x0001) && (c <= 0x007F))) break; - bytearr[count++] = (uint8_t) c; - } - - for (;i < strlen; i++){ - c = str.at(i); - if ((c >= 0x0001) && (c <= 0x007F)) { - bytearr[count++] = (uint8_t) c; - } else if (c > 0x07FF) { - bytearr[count++] = (uint8_t) (0xE0 | ((c >> 12) & 0x0F)); - bytearr[count++] = (uint8_t) (0x80 | ((c >> 6) & 0x3F)); - bytearr[count++] = (uint8_t) (0x80 | ((c >> 0) & 0x3F)); - } else { - bytearr[count++] = (uint8_t) (0xC0 | ((c >> 6) & 0x1F)); - bytearr[count++] = (uint8_t) (0x80 | ((c >> 0) & 0x3F)); - } - } - int ret; - if (!widen) - { - ret = sendData(bytearr, utflen+2, crc); - } - else - { - ret = sendData(bytearr, utflen+4, crc); - } - delete[] bytearr; - return ret; -} - -int Site2SitePeer::readUTF(std::string &str, bool widen, CRC32 *crc) -{ - uint16_t utflen; - int ret; - - if (!widen) - { - ret = read(utflen, crc); - if (ret <= 0) - return ret; - } - else - { - uint32_t len; - ret = read(len, crc); - if (ret <= 0) - return ret; - utflen = len; - } - - uint8_t *bytearr = NULL; - char *chararr = NULL; - bytearr = new uint8_t[utflen]; - chararr = new char[utflen]; - memset(chararr, 0, utflen); - - int c, char2, char3; - int count = 0; - int chararr_count=0; - - ret = read(bytearr, utflen, crc); - if (ret <= 0) - { - delete[] bytearr; - delete[] chararr; - return ret; - } - - while (count < utflen) { - c = (int) bytearr[count] & 0xff; - if (c > 127) break; - count++; - chararr[chararr_count++]=(char)c; - } - - while (count < utflen) { - c = (int) bytearr[count] & 0xff; - switch (c >> 4) { - case 0: case 1: case 2: case 3: case 4: case 5: case 6: case 7: - /* 0xxxxxxx*/ - count++; - chararr[chararr_count++]=(char)c; - break; - case 12: case 13: - /* 110x xxxx 10xx xxxx*/ - count += 2; - if (count > utflen) - { - delete[] bytearr; - delete[] chararr; - return -1; - } - char2 = (int) bytearr[count-1]; - if ((char2 & 0xC0) != 0x80) - { - delete[] bytearr; - delete[] chararr; - return -1; - } - chararr[chararr_count++]=(char)(((c & 0x1F) << 6) | - (char2 & 0x3F)); - break; - case 14: - /* 1110 xxxx 10xx xxxx 10xx xxxx */ - count += 3; - if (count > utflen) - { - delete[] bytearr; - delete[] chararr; - return -1; - } - char2 = (int) bytearr[count-2]; - char3 = (int) bytearr[count-1]; - if (((char2 & 0xC0) != 0x80) || ((char3 & 0xC0) != 0x80)) - { - delete[] bytearr; - delete[] chararr; - return -1; - } - chararr[chararr_count++]=(char)(((c & 0x0F) << 12) | - ((char2 & 0x3F) << 6) | - ((char3 & 0x3F) << 0)); - break; - default: - delete[] bytearr; - delete[] chararr; - return -1; - } - } - // The number of chars produced may be less than utflen - std::string value(chararr, chararr_count); - str = value; - delete[] bytearr; - delete[] chararr; - if (!widen) - return (2 + utflen); - else - return (4 + utflen); -} http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b02af540/src/TailFile.cpp ---------------------------------------------------------------------- diff --git a/src/TailFile.cpp b/src/TailFile.cpp deleted file mode 100644 index 445255b..0000000 --- a/src/TailFile.cpp +++ /dev/null @@ -1,272 +0,0 @@ -/** - * @file TailFile.cpp - * TailFile class implementation - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#include <vector> -#include <queue> -#include <map> -#include <set> -#include <sys/time.h> -#include <sys/types.h> -#include <sys/stat.h> -#include <time.h> -#include <sstream> -#include <stdio.h> -#include <string> -#include <iostream> -#include <dirent.h> -#include <limits.h> -#include <unistd.h> - -#include "TimeUtil.h" -#include "TailFile.h" -#include "ProcessContext.h" -#include "ProcessSession.h" - -const std::string TailFile::ProcessorName("TailFile"); -Property TailFile::FileName("File to Tail", "Fully-qualified filename of the file that should be tailed", ""); -Property TailFile::StateFile("State File", - "Specifies the file that should be used for storing state about what data has been ingested so that upon restart NiFi can resume from where it left off", ""); -Relationship TailFile::Success("success", "All files are routed to success"); - -void TailFile::initialize() -{ - //! Set the supported properties - std::set<Property> properties; - properties.insert(FileName); - properties.insert(StateFile); - setSupportedProperties(properties); - //! Set the supported relationships - std::set<Relationship> relationships; - relationships.insert(Success); - setSupportedRelationships(relationships); -} - -std::string TailFile::trimLeft(const std::string& s) -{ - const char *WHITESPACE = " \n\r\t"; - size_t startpos = s.find_first_not_of(WHITESPACE); - return (startpos == std::string::npos) ? "" : s.substr(startpos); -} - -std::string TailFile::trimRight(const std::string& s) -{ - const char *WHITESPACE = " \n\r\t"; - size_t endpos = s.find_last_not_of(WHITESPACE); - return (endpos == std::string::npos) ? "" : s.substr(0, endpos+1); -} - -void TailFile::parseStateFileLine(char *buf) -{ - char *line = buf; - - while ((line[0] == ' ') || (line[0] =='\t')) - ++line; - - char first = line[0]; - if ((first == '\0') || (first == '#') || (first == '\r') || (first == '\n') || (first == '=')) - { - return; - } - - char *equal = strchr(line, '='); - if (equal == NULL) - { - return; - } - - equal[0] = '\0'; - std::string key = line; - - equal++; - while ((equal[0] == ' ') || (equal[0] == '\t')) - ++equal; - - first = equal[0]; - if ((first == '\0') || (first == '\r') || (first== '\n')) - { - return; - } - - std::string value = equal; - key = trimRight(key); - value = trimRight(value); - - if (key == "FILENAME") - this->_currentTailFileName = value; - if (key == "POSITION") - this->_currentTailFilePosition = std::stoi(value); - - return; -} - -void TailFile::recoverState() -{ - std::ifstream file(_stateFile.c_str(), std::ifstream::in); - if (!file.good()) - { - _logger->log_error("load state file failed %s", _stateFile.c_str()); - return; - } - const unsigned int bufSize = 512; - char buf[bufSize]; - for (file.getline(buf,bufSize); file.good(); file.getline(buf,bufSize)) - { - parseStateFileLine(buf); - } -} - -void TailFile::storeState() -{ - std::ofstream file(_stateFile.c_str()); - if (!file.is_open()) - { - _logger->log_error("store state file failed %s", _stateFile.c_str()); - return; - } - file << "FILENAME=" << this->_currentTailFileName << "\n"; - file << "POSITION=" << this->_currentTailFilePosition << "\n"; - file.close(); -} - -static bool sortTailMatchedFileItem(TailMatchedFileItem i, TailMatchedFileItem j) -{ - return (i.modifiedTime < j.modifiedTime); -} -void TailFile::checkRollOver() -{ - struct stat statbuf; - std::vector<TailMatchedFileItem> matchedFiles; - std::string fullPath = this->_fileLocation + "/" + _currentTailFileName; - - if (stat(fullPath.c_str(), &statbuf) == 0) - { - if (statbuf.st_size > this->_currentTailFilePosition) - // there are new input for the current tail file - return; - - uint64_t modifiedTimeCurrentTailFile = ((uint64_t) (statbuf.st_mtime) * 1000); - std::string pattern = _fileName; - std::size_t found = _fileName.find_last_of("."); - if (found != std::string::npos) - pattern = _fileName.substr(0,found); - DIR *d; - d = opendir(this->_fileLocation.c_str()); - if (!d) - return; - while (1) - { - struct dirent *entry; - entry = readdir(d); - if (!entry) - break; - std::string d_name = entry->d_name; - if (!(entry->d_type & DT_DIR)) - { - std::string fileName = d_name; - std::string fileFullName = this->_fileLocation + "/" + d_name; - if (fileFullName.find(pattern) != std::string::npos && stat(fileFullName.c_str(), &statbuf) == 0) - { - if (((uint64_t) (statbuf.st_mtime) * 1000) >= modifiedTimeCurrentTailFile) - { - TailMatchedFileItem item; - item.fileName = fileName; - item.modifiedTime = ((uint64_t) (statbuf.st_mtime) * 1000); - matchedFiles.push_back(item); - } - } - } - } - closedir(d); - - // Sort the list based on modified time - std::sort(matchedFiles.begin(), matchedFiles.end(), sortTailMatchedFileItem); - for (std::vector<TailMatchedFileItem>::iterator it = matchedFiles.begin(); it!=matchedFiles.end(); ++it) - { - TailMatchedFileItem item = *it; - if (item.fileName == _currentTailFileName) - { - ++it; - if (it!=matchedFiles.end()) - { - TailMatchedFileItem nextItem = *it; - _logger->log_info("TailFile File Roll Over from %s to %s", _currentTailFileName.c_str(), nextItem.fileName.c_str()); - _currentTailFileName = nextItem.fileName; - _currentTailFilePosition = 0; - storeState(); - } - break; - } - } - } - else - return; -} - - -void TailFile::onTrigger(ProcessContext *context, ProcessSession *session) -{ - std::string value; - if (context->getProperty(FileName.getName(), value)) - { - std::size_t found = value.find_last_of("/\\"); - this->_fileLocation = value.substr(0,found); - this->_fileName = value.substr(found+1); - } - if (context->getProperty(StateFile.getName(), value)) - { - _stateFile = value; - } - if (!this->_stateRecovered) - { - _stateRecovered = true; - this->_currentTailFileName = _fileName; - this->_currentTailFilePosition = 0; - // recover the state if we have not done so - this->recoverState(); - } - checkRollOver(); - std::string fullPath = this->_fileLocation + "/" + _currentTailFileName; - struct stat statbuf; - if (stat(fullPath.c_str(), &statbuf) == 0) - { - if (statbuf.st_size <= this->_currentTailFilePosition) - // there are no new input for the current tail file - { - context->yield(); - return; - } - FlowFileRecord *flowFile = session->create(); - if (!flowFile) - return; - std::size_t found = _currentTailFileName.find_last_of("."); - std::string baseName = _currentTailFileName.substr(0,found); - std::string extension = _currentTailFileName.substr(found+1); - flowFile->updateAttribute(PATH, _fileLocation); - flowFile->addAttribute(ABSOLUTE_PATH, fullPath); - session->import(fullPath, flowFile, true, this->_currentTailFilePosition); - session->transfer(flowFile, Success); - _logger->log_info("TailFile %s for %d bytes", _currentTailFileName.c_str(), flowFile->getSize()); - std::string logName = baseName + "." + std::to_string(_currentTailFilePosition) + "-" + - std::to_string(_currentTailFilePosition + flowFile->getSize()) + "." + extension; - flowFile->updateAttribute(FILENAME, logName); - this->_currentTailFilePosition += flowFile->getSize(); - storeState(); - } -} - http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b02af540/src/TimerDrivenSchedulingAgent.cpp ---------------------------------------------------------------------- diff --git a/src/TimerDrivenSchedulingAgent.cpp b/src/TimerDrivenSchedulingAgent.cpp deleted file mode 100644 index 3ce57ae..0000000 --- a/src/TimerDrivenSchedulingAgent.cpp +++ /dev/null @@ -1,134 +0,0 @@ -/** - * @file TimerDrivenSchedulingAgent.cpp - * TimerDrivenSchedulingAgent class implementation - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#include <chrono> -#include <thread> -#include <iostream> -#include "Property.h" -#include "TimerDrivenSchedulingAgent.h" - -void TimerDrivenSchedulingAgent::schedule(Processor *processor) -{ - std::lock_guard<std::mutex> lock(_mtx); - - _administrativeYieldDuration = 0; - std::string yieldValue; - - if (_configure->get(Configure::nifi_administrative_yield_duration, yieldValue)) - { - TimeUnit unit; - if (Property::StringToTime(yieldValue, _administrativeYieldDuration, unit) && - Property::ConvertTimeUnitToMS(_administrativeYieldDuration, unit, _administrativeYieldDuration)) - { - _logger->log_debug("nifi_administrative_yield_duration: [%d] ms", _administrativeYieldDuration); - } - } - - _boredYieldDuration = 0; - if (_configure->get(Configure::nifi_bored_yield_duration, yieldValue)) - { - TimeUnit unit; - if (Property::StringToTime(yieldValue, _boredYieldDuration, unit) && - Property::ConvertTimeUnitToMS(_boredYieldDuration, unit, _boredYieldDuration)) - { - _logger->log_debug("nifi_bored_yield_duration: [%d] ms", _boredYieldDuration); - } - } - - if (processor->getScheduledState() != RUNNING) - { - _logger->log_info("Can not schedule threads for processor %s because it is not running", processor->getName().c_str()); - return; - } - - std::map<std::string, std::vector<std::thread *>>::iterator it = - _threads.find(processor->getUUIDStr()); - if (it != _threads.end()) - { - _logger->log_info("Can not schedule threads for processor %s because there are existed thread running"); - return; - } - - std::vector<std::thread *> threads; - for (int i = 0; i < processor->getMaxConcurrentTasks(); i++) - { - std::thread *thread = new std::thread(run, this, processor); - thread->detach(); - threads.push_back(thread); - _logger->log_info("Scheduled Time Driven thread %d running for process %s", thread->get_id(), - processor->getName().c_str()); - } - _threads[processor->getUUIDStr().c_str()] = threads; - - return; -} - -void TimerDrivenSchedulingAgent::unschedule(Processor *processor) -{ - std::lock_guard<std::mutex> lock(_mtx); - - if (processor->getScheduledState() != RUNNING) - { - _logger->log_info("Can not unschedule threads for processor %s because it is not running", processor->getName().c_str()); - return; - } - - std::map<std::string, std::vector<std::thread *>>::iterator it = - _threads.find(processor->getUUIDStr()); - - if (it == _threads.end()) - { - _logger->log_info("Can not unschedule threads for processor %s because there are no existed thread running"); - return; - } - for (std::vector<std::thread *>::iterator itThread = it->second.begin(); itThread != it->second.end(); ++itThread) - { - std::thread *thread = *itThread; - _logger->log_info("Scheduled Time Driven thread %d deleted for process %s", thread->get_id(), - processor->getName().c_str()); - delete thread; - } - _threads.erase(processor->getUUIDStr()); - processor->clearActiveTask(); - - return; -} - -void TimerDrivenSchedulingAgent::run(TimerDrivenSchedulingAgent *agent, Processor *processor) -{ - while (agent->_running) - { - bool shouldYield = agent->onTrigger(processor); - - if (processor->isYield()) - { - // Honor the yield - std::this_thread::sleep_for(std::chrono::milliseconds(processor->getYieldTime())); - } - else if (shouldYield && agent->_boredYieldDuration > 0) - { - // No work to do or need to apply back pressure - std::this_thread::sleep_for(std::chrono::milliseconds(agent->_boredYieldDuration)); - } - std::this_thread::sleep_for(std::chrono::nanoseconds(processor->getSchedulingPeriodNano())); - } - return; -} - - http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b02af540/test/FlowFileRecordTest.cpp ---------------------------------------------------------------------- diff --git a/test/FlowFileRecordTest.cpp b/test/FlowFileRecordTest.cpp deleted file mode 100644 index 09a3d33..0000000 --- a/test/FlowFileRecordTest.cpp +++ /dev/null @@ -1,28 +0,0 @@ -/** - * @file MiNiFiMain.cpp - * MiNiFiMain implementation - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#include <vector> -#include <queue> -#include <map> - -#include "FlowFileRecord.h" - -int main(int argc, char **argv) -{ -} http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b02af540/test/Server.cpp ---------------------------------------------------------------------- diff --git a/test/Server.cpp b/test/Server.cpp deleted file mode 100644 index e7b3452..0000000 --- a/test/Server.cpp +++ /dev/null @@ -1,607 +0,0 @@ -/* A simple server in the internet domain using TCP - The port number is passed as an argument */ -#include <stdio.h> -#include <stdlib.h> -#include <string.h> -#include <unistd.h> -#include <sys/types.h> -#include <sys/socket.h> -#include <netinet/in.h> -#include <errno.h> -#include <arpa/inet.h> -#include <fcntl.h> -#include <netdb.h> -#include <string> -#include <errno.h> -#include <chrono> -#include <thread> -#include <iostream> // std::cout -#include <fstream> // std::ifstream -#include <signal.h> - -#define DEFAULT_NIFI_SERVER_PORT 9000 -#define DEFAULT_REPORT_INTERVAL 1000 // 1 sec -#define MAX_READ_TIMEOUT 30000 // 30 seconds - -//! FlowControl Protocol Msg Type -typedef enum { - REGISTER_REQ, // Device Register Request from device to server which contain device serial number, current running flow xml version - REGISTER_RESP, // Device Register Respond from server to device, may contain new flow.xml from server ask device to apply and also device report interval - REPORT_REQ, // Period Device Report from device to server which contain device serial number, current running flow xml name/version and other period report info - REPORT_RESP, // Report Respond from server to device, may ask device to update flow xml or processor property - MAX_FLOW_CONTROL_MSG_TYPE -} FlowControlMsgType; - -//! FlowControl Protocol Msg Type String -static const char *FlowControlMsgTypeStr[MAX_FLOW_CONTROL_MSG_TYPE] = -{ - "REGISTER_REQ", - "REGISTER_RESP", - "REPORT_REQ", - "REPORT_RESP" -}; - -//! Flow Control Msg Type to String -inline const char *FlowControlMsgTypeToStr(FlowControlMsgType type) -{ - if (type < MAX_FLOW_CONTROL_MSG_TYPE) - return FlowControlMsgTypeStr[type]; - else - return NULL; -} - -//! FlowControll Protocol Msg ID (Some Messages are fix length, Some are variable length (TLV) -typedef enum { - //Fix length 8 bytes: client to server in register request, required field - FLOW_SERIAL_NUMBER, - // Flow XML name TLV: client to server in register request and report request, required field - FLOW_XML_NAME, - // Flow XML content, TLV: server to client in register respond, option field in case server want to ask client to load xml from server - FLOW_XML_CONTENT, - // Fix length, 4 bytes Report interval in msec: server to client in register respond, option field - REPORT_INTERVAL, - // Processor Name TLV: server to client in report respond, option field in case server want to ask client to update processor property - PROCESSOR_NAME, - // Processor Property Name TLV: server to client in report respond, option field in case server want to ask client to update processor property - PROPERTY_NAME, - // Processor Property Value TLV: server to client in report respond, option field in case server want to ask client to update processor property - PROPERTY_VALUE, - // Report Blob TLV: client to server in report request, option field in case client want to pickyback the report blob in report request to server - REPORT_BLOB, - MAX_FLOW_MSG_ID -} FlowControlMsgID; - -//! FlowControl Protocol Msg ID String -static const char *FlowControlMsgIDStr[MAX_FLOW_MSG_ID] = -{ - "FLOW_SERIAL_NUMBER", - "FLOW_XML_NAME", - "FLOW_XML_CONTENT", - "REPORT_INTERVAL", - "PROCESSOR_NAME" - "PROPERTY_NAME", - "PROPERTY_VALUE", - "REPORT_BLOB" -}; - -#define TYPE_HDR_LEN 4 // Fix Hdr Type -#define TLV_HDR_LEN 8 // Type 4 bytes and Len 4 bytes - -//! FlowControl Protocol Msg Len -inline int FlowControlMsgIDEncodingLen(FlowControlMsgID id, int payLoadLen) -{ - if (id == FLOW_SERIAL_NUMBER) - return (TYPE_HDR_LEN + 8); - else if (id == REPORT_INTERVAL) - return (TYPE_HDR_LEN + 4); - else if (id < MAX_FLOW_MSG_ID) - return (TLV_HDR_LEN + payLoadLen); - else - return -1; -} - -//! Flow Control Msg Id to String -inline const char *FlowControlMsgIdToStr(FlowControlMsgID id) -{ - if (id < MAX_FLOW_MSG_ID) - return FlowControlMsgIDStr[id]; - else - return NULL; -} - -//! Flow Control Respond status code -typedef enum { - RESP_SUCCESS, - RESP_TRIGGER_REGISTER, // Server respond to client report to re trigger register - RESP_START_FLOW_CONTROLLER, // Server respond to client to start flow controller - RESP_STOP_FLOW_CONTROLLER, // Server respond to client to stop flow controller - RESP_FAILURE, - MAX_RESP_CODE -} FlowControlRespCode; - -//! FlowControl Resp Code str -static const char *FlowControlRespCodeStr[MAX_RESP_CODE] = -{ - "RESP_SUCCESS", - "RESP_TRIGGER_REGISTER", - "RESP_START_FLOW_CONTROLLER", - "RESP_STOP_FLOW_CONTROLLER", - "RESP_FAILURE" -}; - -//! Flow Control Resp Code to String -inline const char *FlowControlRespCodeToStr(FlowControlRespCode code) -{ - if (code < MAX_RESP_CODE) - return FlowControlRespCodeStr[code]; - else - return NULL; -} - -//! Common FlowControlProtocol Header -typedef struct { - uint32_t msgType; //! Msg Type - uint32_t seqNumber; //! Seq Number to match Req with Resp - uint32_t status; //! Resp Code, see FlowControlRespCode - uint32_t payloadLen; //! Msg Payload length -} FlowControlProtocolHeader; - - -//! encode uint32_t -uint8_t *encode(uint8_t *buf, uint32_t value) -{ - *buf++ = (value & 0xFF000000) >> 24; - *buf++ = (value & 0x00FF0000) >> 16; - *buf++ = (value & 0x0000FF00) >> 8; - *buf++ = (value & 0x000000FF); - return buf; -} - -//! encode uint32_t -uint8_t *decode(uint8_t *buf, uint32_t &value) -{ - value = ((buf[0]<<24)|(buf[1]<<16)|(buf[2]<<8)|(buf[3])); - return (buf + 4); -} - -//! encode byte array -uint8_t *encode(uint8_t *buf, uint8_t *bufArray, int size) -{ - memcpy(buf, bufArray, size); - buf += size; - return buf; -} - -//! encode std::string -uint8_t *encode(uint8_t *buf, std::string value) -{ - // add the \0 for size - buf = encode(buf, value.size()+1); - buf = encode(buf, (uint8_t *) value.c_str(), value.size()+1); - return buf; -} - -int sendData(int socket, uint8_t *buf, int buflen) -{ - int ret = 0, bytes = 0; - - while (bytes < buflen) - { - ret = send(socket, buf+bytes, buflen-bytes, 0); - //check for errors - if (ret == -1) - { - return ret; - } - bytes+=ret; - } - - return bytes; -} - -void error(const char *msg) -{ - perror(msg); - exit(1); -} - -/* readline - read a '\n' terminated line from socket fd - into buffer bufptr of size len. The line in the - buffer is terminated with '\0'. - It returns -1 in case of error or if - the capacity of the buffer is exceeded. - It returns 0 if EOF is encountered before reading '\n'. - */ -int readline( int fd, char *bufptr, size_t len ) -{ - /* Note that this function is very tricky. It uses the - static variables bp, cnt, and b to establish a local buffer. - The recv call requests large chunks of data (the size of the buffer). - Then if the recv call reads more than one line, the overflow - remains in the buffer and it is made available to the next call - to readline. - Notice also that this routine reads up to '\n' and overwrites - it with '\0'. Thus if the line is really terminated with - "\r\n", the '\r' will remain unchanged. - */ - char *bufx = bufptr; - static char *bp; - static int cnt = 0; - static char b[ 4096 ]; - char c; - - while ( --len > 0 ) - { - if ( --cnt <= 0 ) - { - cnt = recv( fd, b, sizeof( b ), 0 ); - if ( cnt < 0 ) - { - if ( errno == EINTR ) - { - len++; /* the while will decrement */ - continue; - } - return -1; - } - if ( cnt == 0 ) - return 0; - bp = b; - } - c = *bp++; - *bufptr++ = c; - if ( c == '\n' ) - { - *bufptr = '\0'; - return bufptr - bufx; - } - } - return -1; -} - -int readData(int socket, uint8_t *buf, int buflen) -{ - int sendSize = buflen; - int status; - - while (buflen) - { -#ifndef __MACH__ - status = read(socket, buf, buflen); -#else - status = recv(socket, buf, buflen, 0); -#endif - if (status <= 0) - { - return status; - } - buflen -= status; - buf += status; - } - - return sendSize; -} - -int readHdr(int socket, FlowControlProtocolHeader *hdr) -{ - uint8_t buffer[sizeof(FlowControlProtocolHeader)]; - - uint8_t *data = buffer; - - int status = readData(socket, buffer, sizeof(FlowControlProtocolHeader)); - if (status <= 0) - return status; - - uint32_t value; - data = decode(data, value); - hdr->msgType = value; - - data = decode(data, value); - hdr->seqNumber = value; - - data = decode(data, value); - hdr->status = value; - - data = decode(data, value); - hdr->payloadLen = value; - - return sizeof(FlowControlProtocolHeader); -} - -int readXML(char **xmlContent) -{ - std::ifstream is ("conf/flowServer.xml", std::ifstream::binary); - if (is) { - // get length of file: - is.seekg (0, is.end); - int length = is.tellg(); - is.seekg (0, is.beg); - - char * buffer = new char [length]; - - printf("Reading %s len %d\n", "conf/flowServer.xml", length); - // read data as a block: - is.read (buffer,length); - - is.close(); - - // ...buffer contains the entire file... - *xmlContent = buffer; - - return length; - } - return 0; -} - -static int sockfd = 0, newsockfd = 0; -void sigHandler(int signal) -{ - if (signal == SIGINT || signal == SIGTERM) - { - close(newsockfd); - close(sockfd); - exit(1); - } -} - -int main(int argc, char *argv[]) -{ - int portno; - socklen_t clilen; - struct sockaddr_in serv_addr, cli_addr; - char buffer[4096]; - int flag = 0; - int number = 0; - - int n; - if (argc < 2) { - fprintf(stderr,"ERROR, no port provided\n"); - exit(1); - } - - if (signal(SIGINT, sigHandler) == SIG_ERR || signal(SIGTERM, sigHandler) == SIG_ERR) - { - - return -1; - } - sockfd = socket(AF_INET, SOCK_STREAM, 0); - if (sockfd < 0) - error("ERROR opening socket"); - bzero((char *) &serv_addr, sizeof(serv_addr)); - portno = atoi(argv[1]); - serv_addr.sin_family = AF_INET; - serv_addr.sin_addr.s_addr = INADDR_ANY; - serv_addr.sin_port = htons(portno); - if (bind(sockfd, (struct sockaddr *) &serv_addr, - sizeof(serv_addr)) < 0) - error("ERROR on binding"); - listen(sockfd,5); - if (portno == DEFAULT_NIFI_SERVER_PORT) - { - while (true) - { - clilen = sizeof(cli_addr); - newsockfd = accept(sockfd, - (struct sockaddr *) &cli_addr, - &clilen); - if (newsockfd < 0) - { - error("ERROR on accept"); - break; - } - // process request - FlowControlProtocolHeader hdr; - int status = readHdr(newsockfd, &hdr); - if (status > 0) - { - printf("Flow Control Protocol receive MsgType %s\n", FlowControlMsgTypeToStr((FlowControlMsgType) hdr.msgType)); - printf("Flow Control Protocol receive Seq Num %d\n", hdr.seqNumber); - printf("Flow Control Protocol receive Resp Code %s\n", FlowControlRespCodeToStr((FlowControlRespCode) hdr.status)); - printf("Flow Control Protocol receive Payload len %d\n", hdr.payloadLen); - if (((FlowControlMsgType) hdr.msgType) == REGISTER_REQ) - { - printf("Flow Control Protocol Register Req receive\n"); - uint8_t *payload = new uint8_t[hdr.payloadLen]; - uint8_t *payloadPtr = payload; - status = readData(newsockfd, payload, hdr.payloadLen); - while (status > 0 && payloadPtr < (payload + hdr.payloadLen)) - { - uint32_t msgID = 0xFFFFFFFF; - payloadPtr = decode(payloadPtr, msgID); - if (((FlowControlMsgID) msgID) == FLOW_SERIAL_NUMBER) - { - // Fixed 8 bytes - uint8_t seqNum[8]; - memcpy(seqNum, payloadPtr, 8); - printf("Flow Control Protocol Register Req receive serial num\n"); - payloadPtr += 8; - } - else if (((FlowControlMsgID) msgID) == FLOW_XML_NAME) - { - uint32_t len; - payloadPtr = decode(payloadPtr, len); - printf("Flow Control Protocol receive XML name length %d\n", len); - std::string flowName = (const char *) payloadPtr; - payloadPtr += len; - printf("Flow Control Protocol receive XML name %s\n", flowName.c_str()); - } - else - { - break; - } - } - delete[] payload; - // Send Register Respond - // Calculate the total payload msg size - char *xmlContent; - uint32_t xmlLen = readXML(&xmlContent); - uint32_t payloadSize = FlowControlMsgIDEncodingLen(REPORT_INTERVAL, 0); - if (xmlLen > 0) - payloadSize += FlowControlMsgIDEncodingLen(FLOW_XML_CONTENT, xmlLen); - - uint32_t size = sizeof(FlowControlProtocolHeader) + payloadSize; - uint8_t *data = new uint8_t[size]; - uint8_t *start = data; - - // encode the HDR - hdr.msgType = REGISTER_RESP; - hdr.payloadLen = payloadSize; - hdr.status = RESP_SUCCESS; - data = encode(data, hdr.msgType); - data = encode(data, hdr.seqNumber); - data = encode(data, hdr.status); - data = encode(data, hdr.payloadLen); - - // encode the report interval - data = encode(data, REPORT_INTERVAL); - data = encode(data, DEFAULT_REPORT_INTERVAL); - - // encode the XML content - if (xmlLen > 0) - { - data = encode(data, FLOW_XML_CONTENT); - data = encode(data, xmlLen); - data = encode(data, (uint8_t *) xmlContent, xmlLen); - delete[] xmlContent; - } - - // send it - status = sendData(newsockfd, start, size); - delete[] start; - } - else if (((FlowControlMsgType) hdr.msgType) == REPORT_REQ) - { - printf("Flow Control Protocol Report Req receive\n"); - uint8_t *payload = new uint8_t[hdr.payloadLen]; - uint8_t *payloadPtr = payload; - status = readData(newsockfd, payload, hdr.payloadLen); - while (status > 0 && payloadPtr < (payload + hdr.payloadLen)) - { - uint32_t msgID = 0xFFFFFFFF; - payloadPtr = decode(payloadPtr, msgID); - if (((FlowControlMsgID) msgID) == FLOW_XML_NAME) - { - uint32_t len; - payloadPtr = decode(payloadPtr, len); - printf("Flow Control Protocol receive XML name length %d\n", len); - std::string flowName = (const char *) payloadPtr; - payloadPtr += len; - printf("Flow Control Protocol receive XML name %s\n", flowName.c_str()); - } - else - { - break; - } - } - delete[] payload; - // Send Register Respond - // Calculate the total payload msg size - std::string processor = "RealTimeDataCollector"; - std::string propertyName1 = "real Time Message ID"; - std::string propertyValue1 = "41"; - std::string propertyName2 = "Batch Message ID"; - std::string propertyValue2 = "172,30,48"; - if (flag == 0) - { - propertyName1 = "Real Time Message ID"; - propertyValue1 = "41"; - propertyName2 = "Batch Message ID"; - propertyValue2 = "172,48"; - flag = 1; - } - else if (flag == 1) - { - propertyName1 = "Real Time Message ID"; - propertyValue1 = "172,48"; - propertyName2 = "Batch Message ID"; - propertyValue2 = "41"; - flag = 0; - } - uint32_t payloadSize = FlowControlMsgIDEncodingLen(PROCESSOR_NAME, processor.size()+1); - payloadSize += FlowControlMsgIDEncodingLen(PROPERTY_NAME, propertyName1.size()+1); - payloadSize += FlowControlMsgIDEncodingLen(PROPERTY_VALUE, propertyValue1.size()+1); - payloadSize += FlowControlMsgIDEncodingLen(PROPERTY_NAME, propertyName2.size()+1); - payloadSize += FlowControlMsgIDEncodingLen(PROPERTY_VALUE, propertyValue2.size()+1); - - uint32_t size = sizeof(FlowControlProtocolHeader) + payloadSize; - uint8_t *data = new uint8_t[size]; - uint8_t *start = data; - - // encode the HDR - hdr.msgType = REPORT_RESP; - hdr.payloadLen = payloadSize; - hdr.status = RESP_SUCCESS; - - if (number >= 10 && number < 20) - { - // After 10 second report, stop the flow controller for 10 second - hdr.status = RESP_STOP_FLOW_CONTROLLER; - } - else if (number == 20) - { - // restart the flow controller after 10 second - hdr.status = RESP_START_FLOW_CONTROLLER; - } - else if (number == 30) - { - // retrigger register - hdr.status = RESP_TRIGGER_REGISTER; - number = 0; - } - - number++; - - data = encode(data, hdr.msgType); - data = encode(data, hdr.seqNumber); - data = encode(data, hdr.status); - data = encode(data, hdr.payloadLen); - - // encode the processorName - data = encode(data, PROCESSOR_NAME); - data = encode(data, processor); - - // encode the propertyName and value TLV - data = encode(data, PROPERTY_NAME); - data = encode(data, propertyName1); - data = encode(data, PROPERTY_VALUE); - data = encode(data, propertyValue1); - data = encode(data, PROPERTY_NAME); - data = encode(data, propertyName2); - data = encode(data, PROPERTY_VALUE); - data = encode(data, propertyValue2); - // send it - status = sendData(newsockfd, start, size); - delete[] start; - } - } - close(newsockfd); - } - close(sockfd); - } - else - { - clilen = sizeof(cli_addr); - newsockfd = accept(sockfd, - (struct sockaddr *) &cli_addr, - &clilen); - if (newsockfd < 0) - error("ERROR on accept"); - while (1) - { - bzero(buffer,4096); - n = readline(newsockfd,buffer,4095); - if (n <= 0 ) - { - close(newsockfd); - newsockfd = accept(sockfd, - (struct sockaddr *) &cli_addr, - &clilen); - continue; - } - printf("%s",buffer); - } - close(newsockfd); - close(sockfd); - } - return 0; -} http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b02af540/thirdparty/yaml-cpp-yaml-cpp-0.5.3/Makefile ---------------------------------------------------------------------- diff --git a/thirdparty/yaml-cpp-yaml-cpp-0.5.3/Makefile b/thirdparty/yaml-cpp-yaml-cpp-0.5.3/Makefile deleted file mode 100644 index f23f477..0000000 --- a/thirdparty/yaml-cpp-yaml-cpp-0.5.3/Makefile +++ /dev/null @@ -1,40 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License - -CFLAGS = -Wall -INCLUDES = -I./include - -CPP_FILES := $(wildcard src/*.cpp) -OBJ_FILES := $(addprefix build/,$(notdir $(CPP_FILES:.cpp=.o))) - -all: lib/libyaml-cpp.a - -lib: - mkdir -p ./lib - -build: - mkdir -p ./build - -lib/libyaml-cpp.a: $(OBJ_FILES) - mkdir -p ./lib - ar crs $@ $^ - -build/%.o: src/%.cpp - mkdir -p ./build - g++ -Os $(INCLUDES) $(CC_FLAGS) -c -o $@ $< - -clean: - rm -rf ./lib ./build