http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b02af540/src/FlowControlProtocol.cpp ---------------------------------------------------------------------- diff --git a/src/FlowControlProtocol.cpp b/src/FlowControlProtocol.cpp deleted file mode 100644 index 011ebcf..0000000 --- a/src/FlowControlProtocol.cpp +++ /dev/null @@ -1,541 +0,0 @@ -/** - * @file FlowControlProtocol.cpp - * FlowControlProtocol class implementation - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#include <sys/time.h> -#include <stdio.h> -#include <time.h> -#include <chrono> -#include <thread> -#include <random> -#include <netinet/tcp.h> -#include <iostream> -#include "FlowController.h" -#include "FlowControlProtocol.h" - -int FlowControlProtocol::connectServer(const char *host, uint16_t port) -{ - in_addr_t addr; - int sock = 0; - struct hostent *h; -#ifdef __MACH__ - h = gethostbyname(host); -#else - char buf[1024]; - struct hostent he; - int hh_errno; - gethostbyname_r(host, &he, buf, sizeof(buf), &h, &hh_errno); -#endif - memcpy((char *) &addr, h->h_addr_list[0], h->h_length); - sock = socket(AF_INET, SOCK_STREAM, 0); - if (sock < 0) - { - _logger->log_error("Could not create socket to hostName %s", host); - return 0; - } - -#ifndef __MACH__ - int opt = 1; - bool nagle_off = true; - - if (nagle_off) - { - if (setsockopt(sock, SOL_TCP, TCP_NODELAY, (void *)&opt, sizeof(opt)) < 0) - { - _logger->log_error("setsockopt() TCP_NODELAY failed"); - close(sock); - return 0; - } - if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, - (char *)&opt, sizeof(opt)) < 0) - { - _logger->log_error("setsockopt() SO_REUSEADDR failed"); - close(sock); - return 0; - } - } - - int sndsize = 256*1024; - if (setsockopt(sock, SOL_SOCKET, SO_SNDBUF, (char *)&sndsize, (int)sizeof(sndsize)) < 0) - { - _logger->log_error("setsockopt() SO_SNDBUF failed"); - close(sock); - return 0; - } -#endif - - struct sockaddr_in sa; - socklen_t socklen; - int status; - - memset(&sa, 0, sizeof(sa)); - sa.sin_family = AF_INET; - sa.sin_addr.s_addr = htonl(INADDR_ANY); - sa.sin_port = htons(0); - socklen = sizeof(sa); - if (bind(sock, (struct sockaddr *)&sa, socklen) < 0) - { - _logger->log_error("socket bind failed"); - close(sock); - return 0; - } - - memset(&sa, 0, sizeof(sa)); - sa.sin_family = AF_INET; - sa.sin_addr.s_addr = addr; - sa.sin_port = htons(port); - socklen = sizeof(sa); - - status = connect(sock, (struct sockaddr *)&sa, socklen); - - if (status < 0) - { - _logger->log_error("socket connect failed to %s %d", host, port); - close(sock); - return 0; - } - - _logger->log_info("Flow Control Protocol socket %d connect to server %s port %d success", sock, host, port); - - return sock; -} - -int FlowControlProtocol::sendData(uint8_t *buf, int buflen) -{ - int ret = 0, bytes = 0; - - while (bytes < buflen) - { - ret = send(_socket, buf+bytes, buflen-bytes, 0); - //check for errors - if (ret == -1) - { - return ret; - } - bytes+=ret; - } - - return bytes; -} - -int FlowControlProtocol::selectClient(int msec) -{ - fd_set fds; - struct timeval tv; - int retval; - int fd = _socket; - - FD_ZERO(&fds); - FD_SET(fd, &fds); - - tv.tv_sec = msec/1000; - tv.tv_usec = (msec % 1000) * 1000; - - if (msec > 0) - retval = select(fd+1, &fds, NULL, NULL, &tv); - else - retval = select(fd+1, &fds, NULL, NULL, NULL); - - if (retval <= 0) - return retval; - if (FD_ISSET(fd, &fds)) - return retval; - else - return 0; -} - -int FlowControlProtocol::readData(uint8_t *buf, int buflen) -{ - int sendSize = buflen; - - while (buflen) - { - int status; - status = selectClient(MAX_READ_TIMEOUT); - if (status <= 0) - { - return status; - } -#ifndef __MACH__ - status = read(_socket, buf, buflen); -#else - status = recv(_socket, buf, buflen, 0); -#endif - if (status <= 0) - { - return status; - } - buflen -= status; - buf += status; - } - - return sendSize; -} - -int FlowControlProtocol::readHdr(FlowControlProtocolHeader *hdr) -{ - uint8_t buffer[sizeof(FlowControlProtocolHeader)]; - - uint8_t *data = buffer; - - int status = readData(buffer, sizeof(FlowControlProtocolHeader)); - if (status <= 0) - return status; - - uint32_t value; - data = this->decode(data, value); - hdr->msgType = value; - - data = this->decode(data, value); - hdr->seqNumber = value; - - data = this->decode(data, value); - hdr->status = value; - - data = this->decode(data, value); - hdr->payloadLen = value; - - return sizeof(FlowControlProtocolHeader); -} - -void FlowControlProtocol::start() -{ - if (_reportInterval <= 0) - return; - if (_running) - return; - _running = true; - _logger->log_info("FlowControl Protocol Start"); - _thread = new std::thread(run, this); - _thread->detach(); -} - -void FlowControlProtocol::stop() -{ - if (!_running) - return; - _running = false; - _logger->log_info("FlowControl Protocol Stop"); -} - -void FlowControlProtocol::run(FlowControlProtocol *protocol) -{ - while (protocol->_running) - { - std::this_thread::sleep_for(std::chrono::milliseconds(protocol->_reportInterval)); - if (!protocol->_registered) - { - // if it is not register yet - protocol->sendRegisterReq(); - // protocol->_controller->reload("flow.xml"); - } - else - protocol->sendReportReq(); - } - return; -} - -int FlowControlProtocol::sendRegisterReq() -{ - if (_registered) - { - _logger->log_info("Already registered"); - return -1; - } - - uint16_t port = this->_serverPort; - - if (this->_socket <= 0) - this->_socket = connectServer(_serverName.c_str(), port); - - if (this->_socket <= 0) - return -1; - - // Calculate the total payload msg size - uint32_t payloadSize = FlowControlMsgIDEncodingLen(FLOW_SERIAL_NUMBER, 0) + - FlowControlMsgIDEncodingLen(FLOW_XML_NAME, this->_controller->getName().size()+1); - uint32_t size = sizeof(FlowControlProtocolHeader) + payloadSize; - - uint8_t *data = new uint8_t[size]; - uint8_t *start = data; - - // encode the HDR - FlowControlProtocolHeader hdr; - hdr.msgType = REGISTER_REQ; - hdr.payloadLen = payloadSize; - hdr.seqNumber = this->_seqNumber; - hdr.status = RESP_SUCCESS; - data = this->encode(data, hdr.msgType); - data = this->encode(data, hdr.seqNumber); - data = this->encode(data, hdr.status); - data = this->encode(data, hdr.payloadLen); - - // encode the serial number - data = this->encode(data, FLOW_SERIAL_NUMBER); - data = this->encode(data, this->_serialNumber, 8); - - // encode the XML name - data = this->encode(data, FLOW_XML_NAME); - data = this->encode(data, this->_controller->getName()); - - // send it - int status = sendData(start, size); - delete[] start; - if (status <= 0) - { - close(_socket); - _socket = 0; - _logger->log_error("Flow Control Protocol Send Register Req failed"); - return -1; - } - - // Looking for register respond - status = readHdr(&hdr); - - if (status <= 0) - { - close(_socket); - _socket = 0; - _logger->log_error("Flow Control Protocol Read Register Resp header failed"); - return -1; - } - _logger->log_info("Flow Control Protocol receive MsgType %s", FlowControlMsgTypeToStr((FlowControlMsgType) hdr.msgType)); - _logger->log_info("Flow Control Protocol receive Seq Num %d", hdr.seqNumber); - _logger->log_info("Flow Control Protocol receive Resp Code %s", FlowControlRespCodeToStr((FlowControlRespCode) hdr.status)); - _logger->log_info("Flow Control Protocol receive Payload len %d", hdr.payloadLen); - - if (hdr.status == RESP_SUCCESS && hdr.seqNumber == this->_seqNumber) - { - this->_registered = true; - this->_seqNumber++; - _logger->log_info("Flow Control Protocol Register success"); - uint8_t *payload = new uint8_t[hdr.payloadLen]; - uint8_t *payloadPtr = payload; - status = readData(payload, hdr.payloadLen); - if (status <= 0) - { - delete[] payload; - _logger->log_info("Flow Control Protocol Register Read Payload fail"); - close(_socket); - _socket = 0; - return -1; - } - while (payloadPtr < (payload + hdr.payloadLen)) - { - uint32_t msgID; - payloadPtr = this->decode(payloadPtr, msgID); - if (((FlowControlMsgID) msgID) == REPORT_INTERVAL) - { - // Fixed 4 bytes - uint32_t reportInterval; - payloadPtr = this->decode(payloadPtr, reportInterval); - _logger->log_info("Flow Control Protocol receive report interval %d ms", reportInterval); - this->_reportInterval = reportInterval; - } - else if (((FlowControlMsgID) msgID) == FLOW_XML_CONTENT) - { - uint32_t xmlLen; - payloadPtr = this->decode(payloadPtr, xmlLen); - _logger->log_info("Flow Control Protocol receive XML content length %d", xmlLen); - time_t rawtime; - struct tm *timeinfo; - time(&rawtime); - timeinfo = localtime(&rawtime); - std::string xmlFileName = "flow."; - xmlFileName += asctime(timeinfo); - xmlFileName += ".xml"; - std::ofstream fs; - fs.open(xmlFileName.c_str(), std::fstream::out | std::fstream::binary | std::fstream::trunc); - if (fs.is_open()) - { - fs.write((const char *)payloadPtr, xmlLen); - fs.close(); - this->_controller->reload(xmlFileName.c_str()); - } - } - else - { - break; - } - } - delete[] payload; - close(_socket); - _socket = 0; - return 0; - } - else - { - _logger->log_info("Flow Control Protocol Register fail"); - close(_socket); - _socket = 0; - return -1; - } -} - - -int FlowControlProtocol::sendReportReq() -{ - uint16_t port = this->_serverPort; - - if (this->_socket <= 0) - this->_socket = connectServer(_serverName.c_str(), port); - - if (this->_socket <= 0) - return -1; - - // Calculate the total payload msg size - uint32_t payloadSize = - FlowControlMsgIDEncodingLen(FLOW_XML_NAME, this->_controller->getName().size()+1); - uint32_t size = sizeof(FlowControlProtocolHeader) + payloadSize; - - uint8_t *data = new uint8_t[size]; - uint8_t *start = data; - - // encode the HDR - FlowControlProtocolHeader hdr; - hdr.msgType = REPORT_REQ; - hdr.payloadLen = payloadSize; - hdr.seqNumber = this->_seqNumber; - hdr.status = RESP_SUCCESS; - data = this->encode(data, hdr.msgType); - data = this->encode(data, hdr.seqNumber); - data = this->encode(data, hdr.status); - data = this->encode(data, hdr.payloadLen); - - // encode the XML name - data = this->encode(data, FLOW_XML_NAME); - data = this->encode(data, this->_controller->getName()); - - // send it - int status = sendData(start, size); - delete[] start; - if (status <= 0) - { - close(_socket); - _socket = 0; - _logger->log_error("Flow Control Protocol Send Report Req failed"); - return -1; - } - - // Looking for report respond - status = readHdr(&hdr); - - if (status <= 0) - { - close(_socket); - _socket = 0; - _logger->log_error("Flow Control Protocol Read Report Resp header failed"); - return -1; - } - _logger->log_info("Flow Control Protocol receive MsgType %s", FlowControlMsgTypeToStr((FlowControlMsgType) hdr.msgType)); - _logger->log_info("Flow Control Protocol receive Seq Num %d", hdr.seqNumber); - _logger->log_info("Flow Control Protocol receive Resp Code %s", FlowControlRespCodeToStr((FlowControlRespCode) hdr.status)); - _logger->log_info("Flow Control Protocol receive Payload len %d", hdr.payloadLen); - - if (hdr.status == RESP_SUCCESS && hdr.seqNumber == this->_seqNumber) - { - this->_seqNumber++; - uint8_t *payload = new uint8_t[hdr.payloadLen]; - uint8_t *payloadPtr = payload; - status = readData(payload, hdr.payloadLen); - if (status <= 0) - { - delete[] payload; - _logger->log_info("Flow Control Protocol Report Resp Read Payload fail"); - close(_socket); - _socket = 0; - return -1; - } - std::string processor; - std::string propertyName; - std::string propertyValue; - while (payloadPtr < (payload + hdr.payloadLen)) - { - uint32_t msgID; - payloadPtr = this->decode(payloadPtr, msgID); - if (((FlowControlMsgID) msgID) == PROCESSOR_NAME) - { - uint32_t len; - payloadPtr = this->decode(payloadPtr, len); - processor = (const char *) payloadPtr; - payloadPtr += len; - _logger->log_info("Flow Control Protocol receive report resp processor %s", processor.c_str()); - } - else if (((FlowControlMsgID) msgID) == PROPERTY_NAME) - { - uint32_t len; - payloadPtr = this->decode(payloadPtr, len); - propertyName = (const char *) payloadPtr; - payloadPtr += len; - _logger->log_info("Flow Control Protocol receive report resp property name %s", propertyName.c_str()); - } - else if (((FlowControlMsgID) msgID) == PROPERTY_VALUE) - { - uint32_t len; - payloadPtr = this->decode(payloadPtr, len); - propertyValue = (const char *) payloadPtr; - payloadPtr += len; - _logger->log_info("Flow Control Protocol receive report resp property value %s", propertyValue.c_str()); - this->_controller->updatePropertyValue(processor, propertyName, propertyValue); - } - else - { - break; - } - } - delete[] payload; - close(_socket); - _socket = 0; - return 0; - } - else if (hdr.status == RESP_TRIGGER_REGISTER && hdr.seqNumber == this->_seqNumber) - { - _logger->log_info("Flow Control Protocol trigger reregister"); - this->_registered = false; - this->_seqNumber++; - close(_socket); - _socket = 0; - return 0; - } - else if (hdr.status == RESP_STOP_FLOW_CONTROLLER && hdr.seqNumber == this->_seqNumber) - { - _logger->log_info("Flow Control Protocol stop flow controller"); - this->_controller->stop(true); - this->_seqNumber++; - close(_socket); - _socket = 0; - return 0; - } - else if (hdr.status == RESP_START_FLOW_CONTROLLER && hdr.seqNumber == this->_seqNumber) - { - _logger->log_info("Flow Control Protocol start flow controller"); - this->_controller->start(); - this->_seqNumber++; - close(_socket); - _socket = 0; - return 0; - } - else - { - _logger->log_info("Flow Control Protocol Report fail"); - close(_socket); - _socket = 0; - return -1; - } -} -
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b02af540/src/FlowController.cpp ---------------------------------------------------------------------- diff --git a/src/FlowController.cpp b/src/FlowController.cpp deleted file mode 100644 index 8fbe3dc..0000000 --- a/src/FlowController.cpp +++ /dev/null @@ -1,1190 +0,0 @@ -/** - * @file FlowController.cpp - * FlowController class implementation - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#include <vector> -#include <queue> -#include <map> -#include <set> -#include <sys/time.h> -#include <time.h> -#include <chrono> -#include <thread> -#include <libxml/parser.h> -#include <libxml/tree.h> - -#include "FlowController.h" -#include "ProcessContext.h" - -FlowController::FlowController(std::string name) -: _name(name) -{ - uuid_generate(_uuid); - - // Setup the default values - _configurationFileName = DEFAULT_FLOW_YAML_FILE_NAME; - _maxEventDrivenThreads = DEFAULT_MAX_EVENT_DRIVEN_THREAD; - _maxTimerDrivenThreads = DEFAULT_MAX_TIMER_DRIVEN_THREAD; - _running = false; - _initialized = false; - _root = NULL; - _logger = Logger::getLogger(); - _protocol = new FlowControlProtocol(this); - - // NiFi config properties - _configure = Configure::getConfigure(); - - std::string rawConfigFileString; - _configure->get(Configure::nifi_flow_configuration_file, rawConfigFileString); - - if (!rawConfigFileString.empty()) - { - _configurationFileName = rawConfigFileString; - } - - char *path = NULL; - char full_path[PATH_MAX]; - - std::string adjustedFilename; - if (!_configurationFileName.empty()) - { - // perform a naive determination if this is a relative path - if (_configurationFileName.c_str()[0] != '/') - { - adjustedFilename = adjustedFilename + _configure->getHome() + "/" + _configurationFileName; - } - else - { - adjustedFilename = _configurationFileName; - } - } - - path = realpath(adjustedFilename.c_str(), full_path); - if (!path) - { - _logger->log_error("Could not locate path from provided configuration file name."); - } - - std::string pathString(path); - _configurationFileName = pathString; - _logger->log_info("FlowController NiFi Configuration file %s", pathString.c_str()); - - // Create repos for flow record and provenance - - _logger->log_info("FlowController %s created", _name.c_str()); -} - -FlowController::~FlowController() -{ - stop(true); - unload(); - delete _protocol; -} - -bool FlowController::isRunning() -{ - return (_running); -} - -bool FlowController::isInitialized() -{ - return (_initialized); -} - -void FlowController::stop(bool force) -{ - if (_running) - { - _logger->log_info("Stop Flow Controller"); - this->_timerScheduler.stop(); - // Wait for sometime for thread stop - std::this_thread::sleep_for(std::chrono::milliseconds(1000)); - if (this->_root) - this->_root->stopProcessing(&this->_timerScheduler); - _running = false; - } -} - -void FlowController::unload() -{ - if (_running) - { - stop(true); - } - if (_initialized) - { - _logger->log_info("Unload Flow Controller"); - if (_root) - delete _root; - _root = NULL; - _initialized = false; - _name = ""; - } - - return; -} - -void FlowController::reload(std::string xmlFile) -{ - _logger->log_info("Starting to reload Flow Controller with xml %s", xmlFile.c_str()); - stop(true); - unload(); - std::string oldxmlFile = this->_configurationFileName; - this->_configurationFileName = xmlFile; - load(ConfigFormat::XML); - start(); - if (!this->_root) - { - this->_configurationFileName = oldxmlFile; - _logger->log_info("Rollback Flow Controller to xml %s", oldxmlFile.c_str()); - stop(true); - unload(); - load(ConfigFormat::XML); - start(); - } -} - -Processor *FlowController::createProcessor(std::string name, uuid_t uuid) -{ - Processor *processor = NULL; - if (name == GenerateFlowFile::ProcessorName) - { - processor = new GenerateFlowFile(name, uuid); - } - else if (name == LogAttribute::ProcessorName) - { - processor = new LogAttribute(name, uuid); - } - else if (name == RealTimeDataCollector::ProcessorName) - { - processor = new RealTimeDataCollector(name, uuid); - } - else if (name == GetFile::ProcessorName) - { - processor = new GetFile(name, uuid); - } - else if (name == TailFile::ProcessorName) - { - processor = new TailFile(name, uuid); - } - else if (name == ListenSyslog::ProcessorName) - { - processor = new ListenSyslog(name, uuid); - } - else if (name == ExecuteProcess::ProcessorName) - { - processor = new ExecuteProcess(name, uuid); - } - else - { - _logger->log_error("No Processor defined for %s", name.c_str()); - return NULL; - } - - //! initialize the processor - processor->initialize(); - - return processor; -} - -ProcessGroup *FlowController::createRootProcessGroup(std::string name, uuid_t uuid) -{ - return new ProcessGroup(ROOT_PROCESS_GROUP, name, uuid); -} - -ProcessGroup *FlowController::createRemoteProcessGroup(std::string name, uuid_t uuid) -{ - return new ProcessGroup(REMOTE_PROCESS_GROUP, name, uuid); -} - -Connection *FlowController::createConnection(std::string name, uuid_t uuid) -{ - return new Connection(name, uuid); -} - -void FlowController::parseConnection(xmlDoc *doc, xmlNode *node, ProcessGroup *parent) -{ - uuid_t uuid; - xmlNode *currentNode; - Connection *connection = NULL; - - if (!parent) - { - _logger->log_error("parseProcessNode: no parent group existed"); - return; - } - - // generate the random UIID - uuid_generate(uuid); - - for (currentNode = node->xmlChildrenNode; currentNode != NULL; currentNode = currentNode->next) - { - if (currentNode->type == XML_ELEMENT_NODE) - { - if (xmlStrcmp(currentNode->name, BAD_CAST "id") == 0) - { - char *id = (char *) xmlNodeGetContent(currentNode); - if (id) { - _logger->log_debug("parseConnection: id => [%s]", id); - uuid_parse(id, uuid); - xmlFree(id); - } - } else if (xmlStrcmp(currentNode->name, BAD_CAST "name") == 0) { - char *name = (char *) xmlNodeGetContent(currentNode); - if (name) { - _logger->log_debug("parseConnection: name => [%s]", name); - connection = this->createConnection(name, uuid); - if (connection == NULL) { - xmlFree(name); - return; - } - xmlFree(name); - } - } else if (xmlStrcmp(currentNode->name, BAD_CAST "sourceId") == 0) { - char *id = (char *) xmlNodeGetContent(currentNode); - if (id) { - _logger->log_debug("parseConnection: sourceId => [%s]", id); - uuid_parse(id, uuid); - xmlFree(id); - if (connection) - connection->setSourceProcessorUUID(uuid); - } - } else if (xmlStrcmp(currentNode->name, BAD_CAST "destinationId") == 0) { - char *id = (char *) xmlNodeGetContent(currentNode); - if (id) { - _logger->log_debug("parseConnection: destinationId => [%s]", id); - uuid_parse(id, uuid); - xmlFree(id); - if (connection) - connection->setDestinationProcessorUUID(uuid); - } - } else if (xmlStrcmp(currentNode->name, BAD_CAST "maxWorkQueueSize") == 0) { - char *temp = (char *) xmlNodeGetContent(currentNode); - int64_t maxWorkQueueSize = 0; - if (temp) { - if (Property::StringToInt(temp, maxWorkQueueSize)) { - _logger->log_debug("parseConnection: maxWorkQueueSize => [%d]", maxWorkQueueSize); - if (connection) - connection->setMaxQueueSize(maxWorkQueueSize); - - } - xmlFree(temp); - } - } else if (xmlStrcmp(currentNode->name, BAD_CAST "maxWorkQueueDataSize") == 0) { - char *temp = (char *) xmlNodeGetContent(currentNode); - int64_t maxWorkQueueDataSize = 0; - if (temp) { - if (Property::StringToInt(temp, maxWorkQueueDataSize)) { - _logger->log_debug("parseConnection: maxWorkQueueDataSize => [%d]", maxWorkQueueDataSize); - if (connection) - connection->setMaxQueueDataSize(maxWorkQueueDataSize); - - } - xmlFree(temp); - } - } else if (xmlStrcmp(currentNode->name, BAD_CAST "relationship") == 0) { - char *temp = (char *) xmlNodeGetContent(currentNode); - if (temp) { - std::string relationshipName = temp; - if (!relationshipName.empty()) { - Relationship relationship(relationshipName, ""); - _logger->log_debug("parseConnection: relationship => [%s]", relationshipName.c_str()); - if (connection) - connection->setRelationship(relationship); - } else { - Relationship empty; - _logger->log_debug("parseConnection: relationship => [%s]", empty.getName().c_str()); - if (connection) - connection->setRelationship(empty); - } - xmlFree(temp); - } - } - } // if (currentNode->type == XML_ELEMENT_NODE) - } // for node - - if (connection) - parent->addConnection(connection); - - return; -} - -void FlowController::parseRootProcessGroup(xmlDoc *doc, xmlNode *node) { - uuid_t uuid; - xmlNode *currentNode; - ProcessGroup *group = NULL; - - // generate the random UIID - uuid_generate(uuid); - - for (currentNode = node->xmlChildrenNode; currentNode != NULL; currentNode = currentNode->next) { - if (currentNode->type == XML_ELEMENT_NODE) { - if (xmlStrcmp(currentNode->name, BAD_CAST "id") == 0) { - char *id = (char *) xmlNodeGetContent(currentNode); - if (id) { - _logger->log_debug("parseRootProcessGroup: id => [%s]", id); - uuid_parse(id, uuid); - xmlFree(id); - } - } else if (xmlStrcmp(currentNode->name, BAD_CAST "name") == 0) { - char *name = (char *) xmlNodeGetContent(currentNode); - if (name) { - _logger->log_debug("parseRootProcessGroup: name => [%s]", name); - group = this->createRootProcessGroup(name, uuid); - if (group == NULL) { - xmlFree(name); - return; - } - // Set the root process group - this->_root = group; - this->_name = name; - xmlFree(name); - } - } else if (xmlStrcmp(currentNode->name, BAD_CAST "processor") == 0) { - this->parseProcessorNode(doc, currentNode, group); - } else if (xmlStrcmp(currentNode->name, BAD_CAST "connection") == 0) { - this->parseConnection(doc, currentNode, group); - } else if (xmlStrcmp(currentNode->name, BAD_CAST "remoteProcessGroup") == 0) { - this->parseRemoteProcessGroup(doc, currentNode, group); - } - } // if (currentNode->type == XML_ELEMENT_NODE) - } // for node -} - -void FlowController::parseRootProcessGroupYaml(YAML::Node rootFlowNode) { - uuid_t uuid; - ProcessGroup *group = NULL; - - // generate the random UIID - uuid_generate(uuid); - - std::string flowName = rootFlowNode["name"].as<std::string>(); - - char uuidStr[37]; - uuid_unparse(_uuid, uuidStr); - _logger->log_debug("parseRootProcessGroup: id => [%s]", uuidStr); - _logger->log_debug("parseRootProcessGroup: name => [%s]", flowName.c_str()); - group = this->createRootProcessGroup(flowName, uuid); - this->_root = group; - this->_name = flowName; -} - -void FlowController::parseProcessorNodeYaml(YAML::Node processorsNode, ProcessGroup *parentGroup) { - int64_t schedulingPeriod = -1; - int64_t penalizationPeriod = -1; - int64_t yieldPeriod = -1; - int64_t runDurationNanos = -1; - uuid_t uuid; - Processor *processor = NULL; - - if (!parentGroup) { - _logger->log_error("parseProcessNodeYaml: no parent group exists"); - return; - } - - if (processorsNode) { - // Evaluate sequence of processors - int numProcessors = processorsNode.size(); - if (numProcessors < 1) { - throw new std::invalid_argument("There must be at least one processor configured."); - } - - std::vector<ProcessorConfig> processorConfigs; - - if (processorsNode.IsSequence()) { - for (YAML::const_iterator iter = processorsNode.begin(); iter != processorsNode.end(); ++iter) { - ProcessorConfig procCfg; - YAML::Node procNode = iter->as<YAML::Node>(); - - procCfg.name = procNode["name"].as<std::string>(); - _logger->log_debug("parseProcessorNode: name => [%s]", procCfg.name.c_str()); - procCfg.javaClass = procNode["class"].as<std::string>(); - _logger->log_debug("parseProcessorNode: class => [%s]", procCfg.javaClass.c_str()); - - char uuidStr[37]; - uuid_unparse(_uuid, uuidStr); - - // generate the random UUID - uuid_generate(uuid); - - // Determine the processor name only from the Java class - int lastOfIdx = procCfg.javaClass.find_last_of("."); - if (lastOfIdx != std::string::npos) { - lastOfIdx++; // if a value is found, increment to move beyond the . - int nameLength = procCfg.javaClass.length() - lastOfIdx; - std::string processorName = procCfg.javaClass.substr(lastOfIdx, nameLength); - processor = this->createProcessor(processorName, uuid); - } - - if (!processor) { - _logger->log_error("Could not create a processor %s with name %s", procCfg.name.c_str(), uuidStr); - throw std::invalid_argument("Could not create processor " + procCfg.name); - } - processor->setName(procCfg.name); - - procCfg.maxConcurrentTasks = procNode["max concurrent tasks"].as<std::string>(); - _logger->log_debug("parseProcessorNode: max concurrent tasks => [%s]", procCfg.maxConcurrentTasks.c_str()); - procCfg.schedulingStrategy = procNode["scheduling strategy"].as<std::string>(); - _logger->log_debug("parseProcessorNode: scheduling strategy => [%s]", - procCfg.schedulingStrategy.c_str()); - procCfg.schedulingPeriod = procNode["scheduling period"].as<std::string>(); - _logger->log_debug("parseProcessorNode: scheduling period => [%s]", procCfg.schedulingPeriod.c_str()); - procCfg.penalizationPeriod = procNode["penalization period"].as<std::string>(); - _logger->log_debug("parseProcessorNode: penalization period => [%s]", - procCfg.penalizationPeriod.c_str()); - procCfg.yieldPeriod = procNode["yield period"].as<std::string>(); - _logger->log_debug("parseProcessorNode: yield period => [%s]", procCfg.yieldPeriod.c_str()); - procCfg.yieldPeriod = procNode["run duration nanos"].as<std::string>(); - _logger->log_debug("parseProcessorNode: run duration nanos => [%s]", procCfg.runDurationNanos.c_str()); - - // handle auto-terminated relationships - YAML::Node autoTerminatedSequence = procNode["auto-terminated relationships list"]; - std::vector<std::string> rawAutoTerminatedRelationshipValues; - if (autoTerminatedSequence.IsSequence() && !autoTerminatedSequence.IsNull() - && autoTerminatedSequence.size() > 0) { - for (YAML::const_iterator relIter = autoTerminatedSequence.begin(); - relIter != autoTerminatedSequence.end(); ++relIter) { - std::string autoTerminatedRel = relIter->as<std::string>(); - rawAutoTerminatedRelationshipValues.push_back(autoTerminatedRel); - } - } - procCfg.autoTerminatedRelationships = rawAutoTerminatedRelationshipValues; - - // handle processor properties - YAML::Node propertiesNode = procNode["Properties"]; - parsePropertiesNodeYaml(&propertiesNode, processor); - - // Take care of scheduling - TimeUnit unit; - if (Property::StringToTime(procCfg.schedulingPeriod, schedulingPeriod, unit) - && Property::ConvertTimeUnitToNS(schedulingPeriod, unit, schedulingPeriod)) { - _logger->log_debug("convert: parseProcessorNode: schedulingPeriod => [%d] ns", schedulingPeriod); - processor->setSchedulingPeriodNano(schedulingPeriod); - } - - if (Property::StringToTime(procCfg.penalizationPeriod, penalizationPeriod, unit) - && Property::ConvertTimeUnitToMS(penalizationPeriod, unit, penalizationPeriod)) { - _logger->log_debug("convert: parseProcessorNode: penalizationPeriod => [%d] ms", - penalizationPeriod); - processor->setPenalizationPeriodMsec(penalizationPeriod); - } - - if (Property::StringToTime(procCfg.yieldPeriod, yieldPeriod, unit) - && Property::ConvertTimeUnitToMS(yieldPeriod, unit, yieldPeriod)) { - _logger->log_debug("convert: parseProcessorNode: yieldPeriod => [%d] ms", yieldPeriod); - processor->setYieldPeriodMsec(yieldPeriod); - } - - // Default to running - processor->setScheduledState(RUNNING); - - if (procCfg.schedulingStrategy == "TIMER_DRIVEN") { - processor->setSchedulingStrategy(TIMER_DRIVEN); - _logger->log_debug("setting scheduling strategy as %s", procCfg.schedulingStrategy.c_str()); - } else if (procCfg.schedulingStrategy == "EVENT_DRIVEN") { - processor->setSchedulingStrategy(EVENT_DRIVEN); - _logger->log_debug("setting scheduling strategy as %s", procCfg.schedulingStrategy.c_str()); - } else { - processor->setSchedulingStrategy(CRON_DRIVEN); - _logger->log_debug("setting scheduling strategy as %s", procCfg.schedulingStrategy.c_str()); - - } - - int64_t maxConcurrentTasks; - if (Property::StringToInt(procCfg.maxConcurrentTasks, maxConcurrentTasks)) { - _logger->log_debug("parseProcessorNode: maxConcurrentTasks => [%d]", maxConcurrentTasks); - processor->setMaxConcurrentTasks(maxConcurrentTasks); - } - - if (Property::StringToInt(procCfg.runDurationNanos, runDurationNanos)) { - _logger->log_debug("parseProcessorNode: runDurationNanos => [%d]", runDurationNanos); - processor->setRunDurationNano(runDurationNanos); - } - - std::set<Relationship> autoTerminatedRelationships; - for (auto&& relString : procCfg.autoTerminatedRelationships) { - Relationship relationship(relString, ""); - _logger->log_debug("parseProcessorNode: autoTerminatedRelationship => [%s]", relString.c_str()); - autoTerminatedRelationships.insert(relationship); - } - - processor->setAutoTerminatedRelationships(autoTerminatedRelationships); - - parentGroup->addProcessor(processor); - } - } - } else { - throw new std::invalid_argument( - "Cannot instantiate a MiNiFi instance without a defined Processors configuration node."); - } -} - -void FlowController::parseRemoteProcessGroupYaml(YAML::Node *rpgNode, ProcessGroup *parentGroup) { - uuid_t uuid; - - if (!parentGroup) { - _logger->log_error("parseRemoteProcessGroupYaml: no parent group exists"); - return; - } - - if (rpgNode) { - if (rpgNode->IsSequence()) { - for (YAML::const_iterator iter = rpgNode->begin(); iter != rpgNode->end(); ++iter) { - YAML::Node rpgNode = iter->as<YAML::Node>(); - - auto name = rpgNode["name"].as<std::string>(); - _logger->log_debug("parseRemoteProcessGroupYaml: name => [%s]", name.c_str()); - - std::string url = rpgNode["url"].as<std::string>(); - _logger->log_debug("parseRemoteProcessGroupYaml: url => [%s]", url.c_str()); - - std::string timeout = rpgNode["timeout"].as<std::string>(); - _logger->log_debug("parseRemoteProcessGroupYaml: timeout => [%s]", timeout.c_str()); - - std::string yieldPeriod = rpgNode["yield period"].as<std::string>(); - _logger->log_debug("parseRemoteProcessGroupYaml: yield period => [%s]", yieldPeriod.c_str()); - - YAML::Node inputPorts = rpgNode["Input Ports"].as<YAML::Node>(); - ProcessGroup* group = NULL; - - // generate the random UUID - uuid_generate(uuid); - - char uuidStr[37]; - uuid_unparse(_uuid, uuidStr); - - int64_t timeoutValue = -1; - int64_t yieldPeriodValue = -1; - - group = this->createRemoteProcessGroup(name.c_str(), uuid); - group->setParent(parentGroup); - parentGroup->addProcessGroup(group); - - TimeUnit unit; - - if (Property::StringToTime(yieldPeriod, yieldPeriodValue, unit) - && Property::ConvertTimeUnitToMS(yieldPeriodValue, unit, yieldPeriodValue) && group) { - _logger->log_debug("parseRemoteProcessGroupYaml: yieldPeriod => [%d] ms", yieldPeriodValue); - group->setYieldPeriodMsec(yieldPeriodValue); - } - - if (Property::StringToTime(timeout, timeoutValue, unit) - && Property::ConvertTimeUnitToMS(timeoutValue, unit, timeoutValue) && group) { - _logger->log_debug("parseRemoteProcessGroupYaml: timeoutValue => [%d] ms", timeoutValue); - group->setTimeOut(timeoutValue); - } - - group->setTransmitting(true); - group->setURL(url); - - if (inputPorts.IsSequence()) { - for (YAML::const_iterator portIter = inputPorts.begin(); portIter != inputPorts.end(); ++portIter) { - _logger->log_debug("Got a current port, iterating..."); - - YAML::Node currPort = portIter->as<YAML::Node>(); - - this->parsePortYaml(&currPort, group, SEND); - } // for node - } - } - } - } -} - -void FlowController::parseConnectionYaml(YAML::Node *connectionsNode, ProcessGroup *parent) { - uuid_t uuid; - Connection *connection = NULL; - - if (!parent) { - _logger->log_error("parseProcessNode: no parent group was provided"); - return; - } - - if (connectionsNode) { - int numConnections = connectionsNode->size(); - if (numConnections < 1) { - throw new std::invalid_argument("There must be at least one connection configured."); - } - - if (connectionsNode->IsSequence()) { - for (YAML::const_iterator iter = connectionsNode->begin(); iter != connectionsNode->end(); ++iter) { - // generate the random UIID - uuid_generate(uuid); - - YAML::Node connectionNode = iter->as<YAML::Node>(); - - std::string name = connectionNode["name"].as<std::string>(); - std::string destName = connectionNode["destination name"].as<std::string>(); - - char uuidStr[37]; - uuid_unparse(_uuid, uuidStr); - - _logger->log_debug("Created connection with UUID %s and name %s", uuidStr, name.c_str()); - connection = this->createConnection(name, uuid); - auto rawRelationship = connectionNode["source relationship name"].as<std::string>(); - Relationship relationship(rawRelationship, ""); - _logger->log_debug("parseConnection: relationship => [%s]", rawRelationship.c_str()); - if (connection) - connection->setRelationship(relationship); - std::string connectionSrcProcName = connectionNode["source name"].as<std::string>(); - - Processor *srcProcessor = this->_root->findProcessor(connectionSrcProcName); - - if (!srcProcessor) { - _logger->log_error("Could not locate a source with name %s to create a connection", - connectionSrcProcName.c_str()); - throw std::invalid_argument( - "Could not locate a source with name %s to create a connection " + connectionSrcProcName); - } - - Processor *destProcessor = this->_root->findProcessor(destName); - // If we could not find name, try by UUID - if (!destProcessor) { - uuid_t destUuid; - uuid_parse(destName.c_str(), destUuid); - destProcessor = this->_root->findProcessor(destUuid); - } - if (destProcessor) { - std::string destUuid = destProcessor->getUUIDStr(); - } - - uuid_t srcUuid; - uuid_t destUuid; - srcProcessor->getUUID(srcUuid); - connection->setSourceProcessorUUID(srcUuid); - destProcessor->getUUID(destUuid); - connection->setDestinationProcessorUUID(destUuid); - - if (connection) { - parent->addConnection(connection); - } - } - } - - if (connection) - parent->addConnection(connection); - - return; - } -} - -void FlowController::parseRemoteProcessGroup(xmlDoc *doc, xmlNode *node, ProcessGroup *parent) { - uuid_t uuid; - xmlNode *currentNode; - ProcessGroup *group = NULL; - int64_t yieldPeriod = -1; - int64_t timeOut = -1; - -// generate the random UIID - uuid_generate(uuid); - - for (currentNode = node->xmlChildrenNode; currentNode != NULL; currentNode = currentNode->next) { - if (currentNode->type == XML_ELEMENT_NODE) { - if (xmlStrcmp(currentNode->name, BAD_CAST "id") == 0) { - char *id = (char *) xmlNodeGetContent(currentNode); - if (id) { - _logger->log_debug("parseRootProcessGroup: id => [%s]", id); - uuid_parse(id, uuid); - xmlFree(id); - } - } else if (xmlStrcmp(currentNode->name, BAD_CAST "name") == 0) { - char *name = (char *) xmlNodeGetContent(currentNode); - if (name) { - _logger->log_debug("parseRemoteProcessGroup: name => [%s]", name); - group = this->createRemoteProcessGroup(name, uuid); - if (group == NULL) { - xmlFree(name); - return; - } - group->setParent(parent); - parent->addProcessGroup(group); - xmlFree(name); - } - } else if (xmlStrcmp(currentNode->name, BAD_CAST "yieldPeriod") == 0) { - TimeUnit unit; - char *temp = (char *) xmlNodeGetContent(currentNode); - if (temp) { - if (Property::StringToTime(temp, yieldPeriod, unit) - && Property::ConvertTimeUnitToMS(yieldPeriod, unit, yieldPeriod) && group) { - _logger->log_debug("parseRemoteProcessGroup: yieldPeriod => [%d] ms", yieldPeriod); - group->setYieldPeriodMsec(yieldPeriod); - } - xmlFree(temp); - } - } else if (xmlStrcmp(currentNode->name, BAD_CAST "timeout") == 0) { - TimeUnit unit; - char *temp = (char *) xmlNodeGetContent(currentNode); - if (temp) { - if (Property::StringToTime(temp, timeOut, unit) - && Property::ConvertTimeUnitToMS(timeOut, unit, timeOut) && group) { - _logger->log_debug("parseRemoteProcessGroup: timeOut => [%d] ms", timeOut); - group->setTimeOut(timeOut); - } - xmlFree(temp); - } - } else if (xmlStrcmp(currentNode->name, BAD_CAST "transmitting") == 0) { - char *temp = (char *) xmlNodeGetContent(currentNode); - bool transmitting; - if (temp) { - if (Property::StringToBool(temp, transmitting) && group) { - _logger->log_debug("parseRemoteProcessGroup: transmitting => [%d]", transmitting); - group->setTransmitting(transmitting); - } - xmlFree(temp); - } - } else if (xmlStrcmp(currentNode->name, BAD_CAST "inputPort") == 0 && group) { - this->parsePort(doc, currentNode, group, SEND); - } else if (xmlStrcmp(currentNode->name, BAD_CAST "outputPort") == 0 && group) { - this->parsePort(doc, currentNode, group, RECEIVE); - } - } // if (currentNode->type == XML_ELEMENT_NODE) - } // for node -} - -void FlowController::parseProcessorProperty(xmlDoc *doc, xmlNode *node, Processor *processor) { - xmlNode *currentNode; - std::string propertyValue; - std::string propertyName; - - if (!processor) { - _logger->log_error("parseProcessorProperty: no parent processor existed"); - return; - } - - for (currentNode = node->xmlChildrenNode; currentNode != NULL; currentNode = currentNode->next) { - if (currentNode->type == XML_ELEMENT_NODE) { - if (xmlStrcmp(currentNode->name, BAD_CAST "name") == 0) { - char *name = (char *) xmlNodeGetContent(currentNode); - if (name) { - _logger->log_debug("parseProcessorNode: name => [%s]", name); - propertyName = name; - xmlFree(name); - } - } - if (xmlStrcmp(currentNode->name, BAD_CAST "value") == 0) { - char *value = (char *) xmlNodeGetContent(currentNode); - if (value) { - _logger->log_debug("parseProcessorNode: value => [%s]", value); - propertyValue = value; - xmlFree(value); - } - } - if (!propertyName.empty() && !propertyValue.empty()) { - processor->setProperty(propertyName, propertyValue); - } - } // if (currentNode->type == XML_ELEMENT_NODE) - } // for node -} - -void FlowController::parsePortYaml(YAML::Node *portNode, ProcessGroup *parent, TransferDirection direction) { - uuid_t uuid; - Processor *processor = NULL; - RemoteProcessorGroupPort *port = NULL; - - if (!parent) { - _logger->log_error("parseProcessNode: no parent group existed"); - return; - } - - YAML::Node inputPortsObj = portNode->as<YAML::Node>(); - - // generate the random UIID - uuid_generate(uuid); - - auto portId = inputPortsObj["id"].as<std::string>(); - auto nameStr = inputPortsObj["name"].as<std::string>(); - uuid_parse(portId.c_str(), uuid); - - port = new RemoteProcessorGroupPort(nameStr.c_str(), uuid); - - processor = (Processor *) port; - port->setDirection(direction); - port->setTimeOut(parent->getTimeOut()); - port->setTransmitting(true); - processor->setYieldPeriodMsec(parent->getYieldPeriodMsec()); - processor->initialize(); - - // handle port properties - YAML::Node nodeVal = portNode->as<YAML::Node>(); - YAML::Node propertiesNode = nodeVal["Properties"]; - - parsePropertiesNodeYaml(&propertiesNode, processor); - - // add processor to parent - parent->addProcessor(processor); - processor->setScheduledState(RUNNING); - auto rawMaxConcurrentTasks = inputPortsObj["max concurrent tasks"].as<std::string>(); - int64_t maxConcurrentTasks; - if (Property::StringToInt(rawMaxConcurrentTasks, maxConcurrentTasks)) { - processor->setMaxConcurrentTasks(maxConcurrentTasks); - } - _logger->log_debug("parseProcessorNode: maxConcurrentTasks => [%d]", maxConcurrentTasks); - processor->setMaxConcurrentTasks(maxConcurrentTasks); - -} - -void FlowController::parsePort(xmlDoc *doc, xmlNode *processorNode, ProcessGroup *parent, TransferDirection direction) { - char *id = NULL; - char *name = NULL; - uuid_t uuid; - xmlNode *currentNode; - Processor *processor = NULL; - RemoteProcessorGroupPort *port = NULL; - - if (!parent) { - _logger->log_error("parseProcessNode: no parent group existed"); - return; - } -// generate the random UIID - uuid_generate(uuid); - - for (currentNode = processorNode->xmlChildrenNode; currentNode != NULL; currentNode = currentNode->next) { - if (currentNode->type == XML_ELEMENT_NODE) { - if (xmlStrcmp(currentNode->name, BAD_CAST "id") == 0) { - id = (char *) xmlNodeGetContent(currentNode); - if (id) { - _logger->log_debug("parseProcessorNode: id => [%s]", id); - uuid_parse(id, uuid); - xmlFree(id); - } - } else if (xmlStrcmp(currentNode->name, BAD_CAST "name") == 0) { - name = (char *) xmlNodeGetContent(currentNode); - if (name) { - _logger->log_debug("parseProcessorNode: name => [%s]", name); - port = new RemoteProcessorGroupPort(name, uuid); - processor = (Processor *) port; - if (processor == NULL) { - xmlFree(name); - return; - } - port->setDirection(direction); - port->setTimeOut(parent->getTimeOut()); - port->setTransmitting(parent->getTransmitting()); - processor->setYieldPeriodMsec(parent->getYieldPeriodMsec()); - processor->initialize(); - // add processor to parent - parent->addProcessor(processor); - xmlFree(name); - } - } else if (xmlStrcmp(currentNode->name, BAD_CAST "scheduledState") == 0) { - char *temp = (char *) xmlNodeGetContent(currentNode); - if (temp) { - std::string state = temp; - if (state == "DISABLED") { - _logger->log_debug("parseProcessorNode: scheduledState => [%s]", state.c_str()); - processor->setScheduledState(DISABLED); - } - if (state == "STOPPED") { - _logger->log_debug("parseProcessorNode: scheduledState => [%s]", state.c_str()); - processor->setScheduledState(STOPPED); - } - if (state == "RUNNING") { - _logger->log_debug("parseProcessorNode: scheduledState => [%s]", state.c_str()); - processor->setScheduledState(RUNNING); - } - xmlFree(temp); - } - } else if (xmlStrcmp(currentNode->name, BAD_CAST "maxConcurrentTasks") == 0) { - char *temp = (char *) xmlNodeGetContent(currentNode); - if (temp) { - int64_t maxConcurrentTasks; - if (Property::StringToInt(temp, maxConcurrentTasks)) { - _logger->log_debug("parseProcessorNode: maxConcurrentTasks => [%d]", maxConcurrentTasks); - processor->setMaxConcurrentTasks(maxConcurrentTasks); - } - xmlFree(temp); - } - } else if (xmlStrcmp(currentNode->name, BAD_CAST "property") == 0) { - this->parseProcessorProperty(doc, currentNode, processor); - } - } // if (currentNode->type == XML_ELEMENT_NODE) - } // while node -} - -void FlowController::parseProcessorNode(xmlDoc *doc, xmlNode *processorNode, ProcessGroup *parent) { - char *id = NULL; - char *name = NULL; - int64_t schedulingPeriod = -1; - int64_t penalizationPeriod = -1; - int64_t yieldPeriod = -1; - bool lossTolerant = false; - int64_t runDurationNanos = -1; - uuid_t uuid; - xmlNode *currentNode; - Processor *processor = NULL; - - if (!parent) { - _logger->log_error("parseProcessNode: no parent group existed"); - return; - } -// generate the random UIID - uuid_generate(uuid); - - for (currentNode = processorNode->xmlChildrenNode; currentNode != NULL; currentNode = currentNode->next) { - if (currentNode->type == XML_ELEMENT_NODE) { - if (xmlStrcmp(currentNode->name, BAD_CAST "id") == 0) { - id = (char *) xmlNodeGetContent(currentNode); - if (id) { - _logger->log_debug("parseProcessorNode: id => [%s]", id); - uuid_parse(id, uuid); - xmlFree(id); - } - } else if (xmlStrcmp(currentNode->name, BAD_CAST "name") == 0) { - name = (char *) xmlNodeGetContent(currentNode); - if (name) { - _logger->log_debug("parseProcessorNode: name => [%s]", name); - processor = this->createProcessor(name, uuid); - if (processor == NULL) { - xmlFree(name); - return; - } - // add processor to parent - parent->addProcessor(processor); - xmlFree(name); - } - } else if (xmlStrcmp(currentNode->name, BAD_CAST "schedulingPeriod") == 0) { - TimeUnit unit; - char *temp = (char *) xmlNodeGetContent(currentNode); - if (temp) { - if (Property::StringToTime(temp, schedulingPeriod, unit) - && Property::ConvertTimeUnitToNS(schedulingPeriod, unit, schedulingPeriod)) { - _logger->log_debug("parseProcessorNode: schedulingPeriod => [%d] ns", schedulingPeriod); - processor->setSchedulingPeriodNano(schedulingPeriod); - } - xmlFree(temp); - } - } else if (xmlStrcmp(currentNode->name, BAD_CAST "penalizationPeriod") == 0) { - TimeUnit unit; - char *temp = (char *) xmlNodeGetContent(currentNode); - if (temp) { - if (Property::StringToTime(temp, penalizationPeriod, unit) - && Property::ConvertTimeUnitToMS(penalizationPeriod, unit, penalizationPeriod)) { - _logger->log_debug("parseProcessorNode: penalizationPeriod => [%d] ms", penalizationPeriod); - processor->setPenalizationPeriodMsec(penalizationPeriod); - } - xmlFree(temp); - } - } else if (xmlStrcmp(currentNode->name, BAD_CAST "yieldPeriod") == 0) { - TimeUnit unit; - char *temp = (char *) xmlNodeGetContent(currentNode); - if (temp) { - if (Property::StringToTime(temp, yieldPeriod, unit) - && Property::ConvertTimeUnitToMS(yieldPeriod, unit, yieldPeriod)) { - _logger->log_debug("parseProcessorNode: yieldPeriod => [%d] ms", yieldPeriod); - processor->setYieldPeriodMsec(yieldPeriod); - } - xmlFree(temp); - } - } else if (xmlStrcmp(currentNode->name, BAD_CAST "lossTolerant") == 0) { - char *temp = (char *) xmlNodeGetContent(currentNode); - if (temp) { - if (Property::StringToBool(temp, lossTolerant)) { - _logger->log_debug("parseProcessorNode: lossTolerant => [%d]", lossTolerant); - processor->setlossTolerant(lossTolerant); - } - xmlFree(temp); - } - } else if (xmlStrcmp(currentNode->name, BAD_CAST "scheduledState") == 0) { - char *temp = (char *) xmlNodeGetContent(currentNode); - if (temp) { - std::string state = temp; - if (state == "DISABLED") { - _logger->log_debug("parseProcessorNode: scheduledState => [%s]", state.c_str()); - processor->setScheduledState(DISABLED); - } - if (state == "STOPPED") { - _logger->log_debug("parseProcessorNode: scheduledState => [%s]", state.c_str()); - processor->setScheduledState(STOPPED); - } - if (state == "RUNNING") { - _logger->log_debug("parseProcessorNode: scheduledState => [%s]", state.c_str()); - processor->setScheduledState(RUNNING); - } - xmlFree(temp); - } - } else if (xmlStrcmp(currentNode->name, BAD_CAST "schedulingStrategy") == 0) { - char *temp = (char *) xmlNodeGetContent(currentNode); - if (temp) { - std::string strategy = temp; - if (strategy == "TIMER_DRIVEN") { - _logger->log_debug("parseProcessorNode: scheduledStrategy => [%s]", strategy.c_str()); - processor->setSchedulingStrategy(TIMER_DRIVEN); - } - if (strategy == "EVENT_DRIVEN") { - _logger->log_debug("parseProcessorNode: scheduledStrategy => [%s]", strategy.c_str()); - processor->setSchedulingStrategy(EVENT_DRIVEN); - } - xmlFree(temp); - } - } else if (xmlStrcmp(currentNode->name, BAD_CAST "maxConcurrentTasks") == 0) { - char *temp = (char *) xmlNodeGetContent(currentNode); - if (temp) { - int64_t maxConcurrentTasks; - if (Property::StringToInt(temp, maxConcurrentTasks)) { - _logger->log_debug("parseProcessorNode: maxConcurrentTasks => [%d]", maxConcurrentTasks); - processor->setMaxConcurrentTasks(maxConcurrentTasks); - } - xmlFree(temp); - } - } else if (xmlStrcmp(currentNode->name, BAD_CAST "runDurationNanos") == 0) { - char *temp = (char *) xmlNodeGetContent(currentNode); - if (temp) { - if (Property::StringToInt(temp, runDurationNanos)) { - _logger->log_debug("parseProcessorNode: runDurationNanos => [%d]", runDurationNanos); - processor->setRunDurationNano(runDurationNanos); - } - xmlFree(temp); - } - } else if (xmlStrcmp(currentNode->name, BAD_CAST "autoTerminatedRelationship") == 0) { - char *temp = (char *) xmlNodeGetContent(currentNode); - if (temp) { - std::string relationshipName = temp; - Relationship relationship(relationshipName, ""); - std::set<Relationship> relationships; - - relationships.insert(relationship); - processor->setAutoTerminatedRelationships(relationships); - _logger->log_debug("parseProcessorNode: autoTerminatedRelationship => [%s]", - relationshipName.c_str()); - xmlFree(temp); - } - } else if (xmlStrcmp(currentNode->name, BAD_CAST "property") == 0) { - this->parseProcessorProperty(doc, currentNode, processor); - } - } // if (currentNode->type == XML_ELEMENT_NODE) - } // while node -} - -void FlowController::parsePropertiesNodeYaml(YAML::Node *propertiesNode, Processor *processor) -{ - // Treat generically as a YAML node so we can perform inspection on entries to ensure they are populated - for (YAML::const_iterator propsIter = propertiesNode->begin(); propsIter != propertiesNode->end(); ++propsIter) - { - std::string propertyName = propsIter->first.as<std::string>(); - YAML::Node propertyValueNode = propsIter->second; - if (!propertyValueNode.IsNull() && propertyValueNode.IsDefined()) - { - std::string rawValueString = propertyValueNode.as<std::string>(); - if (!processor->setProperty(propertyName, rawValueString)) - { - _logger->log_warn("Received property %s with value %s but is not one of the properties for %s", propertyName.c_str(), rawValueString.c_str(), processor->getName().c_str()); - } - } - } -} - -void FlowController::load(ConfigFormat configFormat) { - if (_running) { - stop(true); - } - if (!_initialized) { - _logger->log_info("Load Flow Controller from file %s", _configurationFileName.c_str()); - - if (ConfigFormat::XML == configFormat) { - _logger->log_info("Detected an XML configuration file for processing."); - - xmlDoc *doc = xmlReadFile(_configurationFileName.c_str(), NULL, XML_PARSE_NONET); - if (doc == NULL) { - _logger->log_error("xmlReadFile returned NULL when reading [%s]", _configurationFileName.c_str()); - _initialized = true; - return; - } - - xmlNode *root = xmlDocGetRootElement(doc); - - if (root == NULL) { - _logger->log_error("Can not get root from XML doc %s", _configurationFileName.c_str()); - xmlFreeDoc(doc); - xmlCleanupParser(); - } - - if (xmlStrcmp(root->name, BAD_CAST "flowController") != 0) { - _logger->log_error("Root name is not flowController for XML doc %s", _configurationFileName.c_str()); - xmlFreeDoc(doc); - xmlCleanupParser(); - return; - } - - xmlNode *currentNode; - - for (currentNode = root->xmlChildrenNode; currentNode != NULL; currentNode = currentNode->next) { - if (currentNode->type == XML_ELEMENT_NODE) { - if (xmlStrcmp(currentNode->name, BAD_CAST "rootGroup") == 0) { - this->parseRootProcessGroup(doc, currentNode); - } else if (xmlStrcmp(currentNode->name, BAD_CAST "maxTimerDrivenThreadCount") == 0) { - char *temp = (char *) xmlNodeGetContent(currentNode); - int64_t maxTimerDrivenThreadCount; - if (temp) { - if (Property::StringToInt(temp, maxTimerDrivenThreadCount)) { - _logger->log_debug("maxTimerDrivenThreadCount => [%d]", maxTimerDrivenThreadCount); - this->_maxTimerDrivenThreads = maxTimerDrivenThreadCount; - } - xmlFree(temp); - } - } else if (xmlStrcmp(currentNode->name, BAD_CAST "maxEventDrivenThreadCount") == 0) { - char *temp = (char *) xmlNodeGetContent(currentNode); - int64_t maxEventDrivenThreadCount; - if (temp) { - if (Property::StringToInt(temp, maxEventDrivenThreadCount)) { - _logger->log_debug("maxEventDrivenThreadCount => [%d]", maxEventDrivenThreadCount); - this->_maxEventDrivenThreads = maxEventDrivenThreadCount; - } - xmlFree(temp); - } - } - } // type == XML_ELEMENT_NODE - } // for - - xmlFreeDoc(doc); - xmlCleanupParser(); - _initialized = true; - } else if (ConfigFormat::YAML == configFormat) { - YAML::Node flow = YAML::LoadFile(_configurationFileName); - - YAML::Node flowControllerNode = flow["Flow Controller"]; - YAML::Node processorsNode = flow[CONFIG_YAML_PROCESSORS_KEY]; - YAML::Node connectionsNode = flow["Connections"]; - YAML::Node remoteProcessingGroupNode = flow["Remote Processing Groups"]; - - // Create the root process group - parseRootProcessGroupYaml(flowControllerNode); - parseProcessorNodeYaml(processorsNode, this->_root); - parseRemoteProcessGroupYaml(&remoteProcessingGroupNode, this->_root); - parseConnectionYaml(&connectionsNode, this->_root); - - _initialized = true; - } - } -} - -bool FlowController::start() { - if (!_initialized) { - _logger->log_error("Can not start Flow Controller because it has not been initialized"); - return false; - } else { - if (!_running) { - _logger->log_info("Start Flow Controller"); - this->_timerScheduler.start(); - if (this->_root) - this->_root->startProcessing(&this->_timerScheduler); - _running = true; - this->_protocol->start(); - } - return true; - } -} http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b02af540/src/FlowFileRecord.cpp ---------------------------------------------------------------------- diff --git a/src/FlowFileRecord.cpp b/src/FlowFileRecord.cpp deleted file mode 100644 index 2dda47a..0000000 --- a/src/FlowFileRecord.cpp +++ /dev/null @@ -1,231 +0,0 @@ -/** - * @file FlowFileRecord.cpp - * Flow file record class implementation - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#include <vector> -#include <queue> -#include <map> -#include <sys/time.h> -#include <time.h> -#include <iostream> -#include <fstream> -#include <cstdio> - -#include "FlowFileRecord.h" -#include "Relationship.h" -#include "Logger.h" - -std::atomic<uint64_t> FlowFileRecord::_localFlowSeqNumber(0); - -FlowFileRecord::FlowFileRecord(std::map<std::string, std::string> attributes, ResourceClaim *claim) -: _size(0), - _id(_localFlowSeqNumber.load()), - _offset(0), - _penaltyExpirationMs(0), - _claim(claim), - _markedDelete(false), - _connection(NULL), - _orginalConnection(NULL) -{ - _entryDate = getTimeMillis(); - _lineageStartDate = _entryDate; - - char uuidStr[37]; - - // Generate the global UUID for the flow record - uuid_generate(_uuid); - // Increase the local ID for the flow record - ++_localFlowSeqNumber; - uuid_unparse(_uuid, uuidStr); - _uuidStr = uuidStr; - - // Populate the default attributes - addAttribute(FILENAME, std::to_string(getTimeNano())); - addAttribute(PATH, DEFAULT_FLOWFILE_PATH); - addAttribute(UUID, uuidStr); - // Populate the attributes from the input - std::map<std::string, std::string>::iterator it; - for (it = attributes.begin(); it!= attributes.end(); it++) - { - addAttribute(it->first, it->second); - } - - _snapshot = false; - - if (_claim) - // Increase the flow file record owned count for the resource claim - _claim->increaseFlowFileRecordOwnedCount(); - _logger = Logger::getLogger(); -} - -FlowFileRecord::~FlowFileRecord() -{ - if (!_snapshot) - _logger->log_debug("Delete FlowFile UUID %s", _uuidStr.c_str()); - else - _logger->log_debug("Delete SnapShot FlowFile UUID %s", _uuidStr.c_str()); - if (_claim) - { - // Decrease the flow file record owned count for the resource claim - _claim->decreaseFlowFileRecordOwnedCount(); - if (_claim->getFlowFileRecordOwnedCount() == 0) - { - _logger->log_debug("Delete Resource Claim %s", _claim->getContentFullPath().c_str()); - std::remove(_claim->getContentFullPath().c_str()); - delete _claim; - } - } -} - -bool FlowFileRecord::addAttribute(FlowAttribute key, std::string value) -{ - const char *keyStr = FlowAttributeKey(key); - if (keyStr) - { - std::string keyString = keyStr; - return addAttribute(keyString, value); - } - else - { - return false; - } -} - -bool FlowFileRecord::addAttribute(std::string key, std::string value) -{ - std::map<std::string, std::string>::iterator it = _attributes.find(key); - if (it != _attributes.end()) - { - // attribute already there in the map - return false; - } - else - { - _attributes[key] = value; - return true; - } -} - -bool FlowFileRecord::removeAttribute(FlowAttribute key) -{ - const char *keyStr = FlowAttributeKey(key); - if (keyStr) - { - std::string keyString = keyStr; - return removeAttribute(keyString); - } - else - { - return false; - } -} - -bool FlowFileRecord::removeAttribute(std::string key) -{ - std::map<std::string, std::string>::iterator it = _attributes.find(key); - if (it != _attributes.end()) - { - _attributes.erase(key); - return true; - } - else - { - return false; - } -} - -bool FlowFileRecord::updateAttribute(FlowAttribute key, std::string value) -{ - const char *keyStr = FlowAttributeKey(key); - if (keyStr) - { - std::string keyString = keyStr; - return updateAttribute(keyString, value); - } - else - { - return false; - } -} - -bool FlowFileRecord::updateAttribute(std::string key, std::string value) -{ - std::map<std::string, std::string>::iterator it = _attributes.find(key); - if (it != _attributes.end()) - { - _attributes[key] = value; - return true; - } - else - { - return false; - } -} - -bool FlowFileRecord::getAttribute(FlowAttribute key, std::string &value) -{ - const char *keyStr = FlowAttributeKey(key); - if (keyStr) - { - std::string keyString = keyStr; - return getAttribute(keyString, value); - } - else - { - return false; - } -} - -bool FlowFileRecord::getAttribute(std::string key, std::string &value) -{ - std::map<std::string, std::string>::iterator it = _attributes.find(key); - if (it != _attributes.end()) - { - value = it->second; - return true; - } - else - { - return false; - } -} - -void FlowFileRecord::duplicate(FlowFileRecord *original) -{ - uuid_copy(this->_uuid, original->_uuid); - this->_attributes = original->_attributes; - this->_entryDate = original->_entryDate; - this->_id = original->_id; - this->_lastQueueDate = original->_lastQueueDate; - this->_lineageStartDate = original->_lineageStartDate; - this->_offset = original->_offset; - this->_penaltyExpirationMs = original->_penaltyExpirationMs; - this->_size = original->_size; - this->_lineageIdentifiers = original->_lineageIdentifiers; - this->_orginalConnection = original->_orginalConnection; - this->_uuidStr = original->_uuidStr; - this->_connection = original->_connection; - this->_markedDelete = original->_markedDelete; - - this->_claim = original->_claim; - if (this->_claim) - this->_claim->increaseFlowFileRecordOwnedCount(); - - this->_snapshot = true; -} - http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b02af540/src/GenerateFlowFile.cpp ---------------------------------------------------------------------- diff --git a/src/GenerateFlowFile.cpp b/src/GenerateFlowFile.cpp deleted file mode 100644 index 4b0603d..0000000 --- a/src/GenerateFlowFile.cpp +++ /dev/null @@ -1,134 +0,0 @@ -/** - * @file GenerateFlowFile.cpp - * GenerateFlowFile class implementation - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#include <vector> -#include <queue> -#include <map> -#include <set> -#include <sys/time.h> -#include <time.h> -#include <chrono> -#include <thread> -#include <random> - -#include "GenerateFlowFile.h" -#include "ProcessContext.h" -#include "ProcessSession.h" - -const char *GenerateFlowFile::DATA_FORMAT_BINARY = "Binary"; -const char *GenerateFlowFile::DATA_FORMAT_TEXT = "Text"; -const std::string GenerateFlowFile::ProcessorName("GenerateFlowFile"); -Property GenerateFlowFile::FileSize("File Size", "The size of the file that will be used", "1 kB"); -Property GenerateFlowFile::BatchSize("Batch Size", "The number of FlowFiles to be transferred in each invocation", "1"); -Property GenerateFlowFile::DataFormat("Data Format", "Specifies whether the data should be Text or Binary", GenerateFlowFile::DATA_FORMAT_BINARY); -Property GenerateFlowFile::UniqueFlowFiles("Unique FlowFiles", - "If true, each FlowFile that is generated will be unique. If false, a random value will be generated and all FlowFiles", "true"); -Relationship GenerateFlowFile::Success("success", "success operational on the flow record"); - -void GenerateFlowFile::initialize() -{ - //! Set the supported properties - std::set<Property> properties; - properties.insert(FileSize); - properties.insert(BatchSize); - properties.insert(DataFormat); - properties.insert(UniqueFlowFiles); - setSupportedProperties(properties); - //! Set the supported relationships - std::set<Relationship> relationships; - relationships.insert(Success); - setSupportedRelationships(relationships); -} - -void GenerateFlowFile::onTrigger(ProcessContext *context, ProcessSession *session) -{ - int64_t batchSize = 1; - bool uniqueFlowFile = true; - int64_t fileSize = 1024; - - std::string value; - if (context->getProperty(FileSize.getName(), value)) - { - Property::StringToInt(value, fileSize); - } - if (context->getProperty(BatchSize.getName(), value)) - { - Property::StringToInt(value, batchSize); - } - if (context->getProperty(UniqueFlowFiles.getName(), value)) - { - Property::StringToBool(value, uniqueFlowFile); - } - - if (!uniqueFlowFile) - { - char *data; - data = new char[fileSize]; - if (!data) - return; - uint64_t dataSize = fileSize; - GenerateFlowFile::WriteCallback callback(data, dataSize); - char *current = data; - for (int i = 0; i < fileSize; i+= sizeof(int)) - { - int randValue = random(); - *((int *) current) = randValue; - current += sizeof(int); - } - for (int i = 0; i < batchSize; i++) - { - // For each batch - FlowFileRecord *flowFile = session->create(); - if (!flowFile) - return; - if (fileSize > 0) - session->write(flowFile, &callback); - session->transfer(flowFile, Success); - } - delete[] data; - } - else - { - if (!_data) - { - // We have not create the unique data yet - _data = new char[fileSize]; - _dataSize = fileSize; - char *current = _data; - for (int i = 0; i < fileSize; i+= sizeof(int)) - { - int randValue = random(); - *((int *) current) = randValue; - // *((int *) current) = (0xFFFFFFFF & i); - current += sizeof(int); - } - } - GenerateFlowFile::WriteCallback callback(_data, _dataSize); - for (int i = 0; i < batchSize; i++) - { - // For each batch - FlowFileRecord *flowFile = session->create(); - if (!flowFile) - return; - if (fileSize > 0) - session->write(flowFile, &callback); - session->transfer(flowFile, Success); - } - } -} http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b02af540/src/GetFile.cpp ---------------------------------------------------------------------- diff --git a/src/GetFile.cpp b/src/GetFile.cpp deleted file mode 100644 index 02e196a..0000000 --- a/src/GetFile.cpp +++ /dev/null @@ -1,295 +0,0 @@ -/** - * @file GetFile.cpp - * GetFile class implementation - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#include <vector> -#include <queue> -#include <map> -#include <set> -#include <sys/time.h> -#include <sys/types.h> -#include <sys/stat.h> -#include <time.h> -#include <sstream> -#include <stdio.h> -#include <string> -#include <iostream> -#include <dirent.h> -#include <limits.h> -#include <unistd.h> -#include <regex> - -#include "TimeUtil.h" -#include "GetFile.h" -#include "ProcessContext.h" -#include "ProcessSession.h" - -const std::string GetFile::ProcessorName("GetFile"); -Property GetFile::BatchSize("Batch Size", "The maximum number of files to pull in each iteration", "10"); -Property GetFile::Directory("Input Directory", "The input directory from which to pull files", "."); -Property GetFile::IgnoreHiddenFile("Ignore Hidden Files", "Indicates whether or not hidden files should be ignored", "true"); -Property GetFile::KeepSourceFile("Keep Source File", - "If true, the file is not deleted after it has been copied to the Content Repository", "false"); -Property GetFile::MaxAge("Maximum File Age", - "The minimum age that a file must be in order to be pulled; any file younger than this amount of time (according to last modification date) will be ignored", "0 sec"); -Property GetFile::MinAge("Minimum File Age", - "The maximum age that a file must be in order to be pulled; any file older than this amount of time (according to last modification date) will be ignored", "0 sec"); -Property GetFile::MaxSize("Maximum File Size", "The maximum size that a file can be in order to be pulled", "0 B"); -Property GetFile::MinSize("Minimum File Size", "The minimum size that a file must be in order to be pulled", "0 B"); -Property GetFile::PollInterval("Polling Interval", "Indicates how long to wait before performing a directory listing", "0 sec"); -Property GetFile::Recurse("Recurse Subdirectories", "Indicates whether or not to pull files from subdirectories", "true"); -Property GetFile::FileFilter("File Filter", "Only files whose names match the given regular expression will be picked up", "[^\\.].*"); -Relationship GetFile::Success("success", "All files are routed to success"); - -void GetFile::initialize() -{ - //! Set the supported properties - std::set<Property> properties; - properties.insert(BatchSize); - properties.insert(Directory); - properties.insert(IgnoreHiddenFile); - properties.insert(KeepSourceFile); - properties.insert(MaxAge); - properties.insert(MinAge); - properties.insert(MaxSize); - properties.insert(MinSize); - properties.insert(PollInterval); - properties.insert(Recurse); - properties.insert(FileFilter); - setSupportedProperties(properties); - //! Set the supported relationships - std::set<Relationship> relationships; - relationships.insert(Success); - setSupportedRelationships(relationships); -} - -void GetFile::onTrigger(ProcessContext *context, ProcessSession *session) -{ - std::string value; - if (context->getProperty(Directory.getName(), value)) - { - _directory = value; - } - if (context->getProperty(BatchSize.getName(), value)) - { - Property::StringToInt(value, _batchSize); - } - if (context->getProperty(IgnoreHiddenFile.getName(), value)) - { - Property::StringToBool(value, _ignoreHiddenFile); - } - if (context->getProperty(KeepSourceFile.getName(), value)) - { - Property::StringToBool(value, _keepSourceFile); - } - if (context->getProperty(MaxAge.getName(), value)) - { - TimeUnit unit; - if (Property::StringToTime(value, _maxAge, unit) && - Property::ConvertTimeUnitToMS(_maxAge, unit, _maxAge)) - { - - } - } - if (context->getProperty(MinAge.getName(), value)) - { - TimeUnit unit; - if (Property::StringToTime(value, _minAge, unit) && - Property::ConvertTimeUnitToMS(_minAge, unit, _minAge)) - { - - } - } - if (context->getProperty(MaxSize.getName(), value)) - { - Property::StringToInt(value, _maxSize); - } - if (context->getProperty(MinSize.getName(), value)) - { - Property::StringToInt(value, _minSize); - } - if (context->getProperty(PollInterval.getName(), value)) - { - TimeUnit unit; - if (Property::StringToTime(value, _pollInterval, unit) && - Property::ConvertTimeUnitToMS(_pollInterval, unit, _pollInterval)) - { - - } - } - if (context->getProperty(Recurse.getName(), value)) - { - Property::StringToBool(value, _recursive); - } - - if (context->getProperty(FileFilter.getName(), value)) - { - _fileFilter = value; - } - - // Perform directory list - if (isListingEmpty()) - { - if (_pollInterval == 0 || (getTimeMillis() - _lastDirectoryListingTime) > _pollInterval) - { - performListing(_directory); - } - } - - if (!isListingEmpty()) - { - try - { - std::queue<std::string> list; - pollListing(list, _batchSize); - while (!list.empty()) - { - std::string fileName = list.front(); - list.pop(); - _logger->log_info("GetFile process %s", fileName.c_str()); - FlowFileRecord *flowFile = session->create(); - if (!flowFile) - return; - std::size_t found = fileName.find_last_of("/\\"); - std::string path = fileName.substr(0,found); - std::string name = fileName.substr(found+1); - flowFile->updateAttribute(FILENAME, name); - flowFile->updateAttribute(PATH, path); - flowFile->addAttribute(ABSOLUTE_PATH, fileName); - session->import(fileName, flowFile, _keepSourceFile); - session->transfer(flowFile, Success); - } - } - catch (std::exception &exception) - { - _logger->log_debug("GetFile Caught Exception %s", exception.what()); - throw; - } - catch (...) - { - throw; - } - } -} - -bool GetFile::isListingEmpty() -{ - std::lock_guard<std::mutex> lock(_mtx); - - return _dirList.empty(); -} - -void GetFile::putListing(std::string fileName) -{ - std::lock_guard<std::mutex> lock(_mtx); - - _dirList.push(fileName); -} - -void GetFile::pollListing(std::queue<std::string> &list, int maxSize) -{ - std::lock_guard<std::mutex> lock(_mtx); - - while (!_dirList.empty() && (maxSize == 0 || list.size() < maxSize)) - { - std::string fileName = _dirList.front(); - _dirList.pop(); - list.push(fileName); - } - - return; -} - -bool GetFile::acceptFile(std::string fileName) -{ - struct stat statbuf; - - if (stat(fileName.c_str(), &statbuf) == 0) - { - if (_minSize > 0 && statbuf.st_size <_minSize) - return false; - - if (_maxSize > 0 && statbuf.st_size > _maxSize) - return false; - - uint64_t modifiedTime = ((uint64_t) (statbuf.st_mtime) * 1000); - uint64_t fileAge = getTimeMillis() - modifiedTime; - if (_minAge > 0 && fileAge < _minAge) - return false; - if (_maxAge > 0 && fileAge > _maxAge) - return false; - - if (_ignoreHiddenFile && fileName.c_str()[0] == '.') - return false; - - if (access(fileName.c_str(), R_OK) != 0) - return false; - - if (_keepSourceFile == false && access(fileName.c_str(), W_OK) != 0) - return false; - - try { - std::regex re(_fileFilter); - if (!std::regex_match(fileName, re)) { - return false; - } - } catch (std::regex_error e) { - _logger->log_error("Invalid File Filter regex: %s.", e.what()); - return false; - } - - return true; - } - - return false; -} - -void GetFile::performListing(std::string dir) -{ - DIR *d; - d = opendir(dir.c_str()); - if (!d) - return; - while (1) - { - struct dirent *entry; - entry = readdir(d); - if (!entry) - break; - std::string d_name = entry->d_name; - if ((entry->d_type & DT_DIR)) - { - // if this is a directory - if (_recursive && strcmp(d_name.c_str(), "..") != 0 && strcmp(d_name.c_str(), ".") != 0) - { - std::string path = dir + "/" + d_name; - performListing(path); - } - } - else - { - std::string fileName = dir + "/" + d_name; - if (acceptFile(fileName)) - { - // check whether we can take this file - putListing(fileName); - } - } - } - closedir(d); -} http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b02af540/src/ListenSyslog.cpp ---------------------------------------------------------------------- diff --git a/src/ListenSyslog.cpp b/src/ListenSyslog.cpp deleted file mode 100644 index ace37d7..0000000 --- a/src/ListenSyslog.cpp +++ /dev/null @@ -1,342 +0,0 @@ -/** - * @file ListenSyslog.cpp - * ListenSyslog class implementation - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#include <queue> -#include <stdio.h> -#include <string> -#include "TimeUtil.h" -#include "ListenSyslog.h" -#include "ProcessContext.h" -#include "ProcessSession.h" - -const std::string ListenSyslog::ProcessorName("ListenSyslog"); -Property ListenSyslog::RecvBufSize("Receive Buffer Size", "The size of each buffer used to receive Syslog messages.", "65507 B"); -Property ListenSyslog::MaxSocketBufSize("Max Size of Socket Buffer", "The maximum size of the socket buffer that should be used.", "1 MB"); -Property ListenSyslog::MaxConnections("Max Number of TCP Connections", "The maximum number of concurrent connections to accept Syslog messages in TCP mode.", "2"); -Property ListenSyslog::MaxBatchSize("Max Batch Size", - "The maximum number of Syslog events to add to a single FlowFile.", "1"); -Property ListenSyslog::MessageDelimiter("Message Delimiter", - "Specifies the delimiter to place between Syslog messages when multiple messages are bundled together (see <Max Batch Size> property).", "\n"); -Property ListenSyslog::ParseMessages("Parse Messages", - "Indicates if the processor should parse the Syslog messages. If set to false, each outgoing FlowFile will only.", "false"); -Property ListenSyslog::Protocol("Protocol", "The protocol for Syslog communication.", "UDP"); -Property ListenSyslog::Port("Port", "The port for Syslog communication.", "514"); -Relationship ListenSyslog::Success("success", "All files are routed to success"); -Relationship ListenSyslog::Invalid("invalid", "SysLog message format invalid"); - -void ListenSyslog::initialize() -{ - //! Set the supported properties - std::set<Property> properties; - properties.insert(RecvBufSize); - properties.insert(MaxSocketBufSize); - properties.insert(MaxConnections); - properties.insert(MaxBatchSize); - properties.insert(MessageDelimiter); - properties.insert(ParseMessages); - properties.insert(Protocol); - properties.insert(Port); - setSupportedProperties(properties); - //! Set the supported relationships - std::set<Relationship> relationships; - relationships.insert(Success); - relationships.insert(Invalid); - setSupportedRelationships(relationships); -} - -void ListenSyslog::startSocketThread() -{ - if (_thread != NULL) - return; - - _logger->log_info("ListenSysLog Socket Thread Start"); - _serverTheadRunning = true; - _thread = new std::thread(run, this); - _thread->detach(); -} - -void ListenSyslog::run(ListenSyslog *process) -{ - process->runThread(); -} - -void ListenSyslog::runThread() -{ - while (_serverTheadRunning) - { - if (_resetServerSocket) - { - _resetServerSocket = false; - // need to reset the socket - std::vector<int>::iterator it; - for (it = _clientSockets.begin(); it != _clientSockets.end(); ++it) - { - int clientSocket = *it; - close(clientSocket); - } - _clientSockets.clear(); - if (_serverSocket > 0) - { - close(_serverSocket); - _serverSocket = 0; - } - } - - if (_serverSocket <= 0) - { - uint16_t portno = _port; - struct sockaddr_in serv_addr; - int sockfd; - if (_protocol == "TCP") - sockfd = socket(AF_INET, SOCK_STREAM, 0); - else - sockfd = socket(AF_INET, SOCK_DGRAM, 0); - if (sockfd < 0) - { - _logger->log_info("ListenSysLog Server socket creation failed"); - break; - } - bzero((char *) &serv_addr, sizeof(serv_addr)); - serv_addr.sin_family = AF_INET; - serv_addr.sin_addr.s_addr = INADDR_ANY; - serv_addr.sin_port = htons(portno); - if (bind(sockfd, (struct sockaddr *) &serv_addr, - sizeof(serv_addr)) < 0) - { - _logger->log_error("ListenSysLog Server socket bind failed"); - break; - } - if (_protocol == "TCP") - listen(sockfd,5); - _serverSocket = sockfd; - _logger->log_error("ListenSysLog Server socket %d bind OK to port %d", _serverSocket, portno); - } - FD_ZERO(&_readfds); - FD_SET(_serverSocket, &_readfds); - _maxFds = _serverSocket; - std::vector<int>::iterator it; - for (it = _clientSockets.begin(); it != _clientSockets.end(); ++it) - { - int clientSocket = *it; - if (clientSocket >= _maxFds) - _maxFds = clientSocket; - FD_SET(clientSocket, &_readfds); - } - fd_set fds; - struct timeval tv; - int retval; - fds = _readfds; - tv.tv_sec = 0; - // 100 msec - tv.tv_usec = 100000; - retval = select(_maxFds+1, &fds, NULL, NULL, &tv); - if (retval < 0) - break; - if (retval == 0) - continue; - if (FD_ISSET(_serverSocket, &fds)) - { - // server socket, either we have UDP datagram or TCP connection request - if (_protocol == "TCP") - { - socklen_t clilen; - struct sockaddr_in cli_addr; - clilen = sizeof(cli_addr); - int newsockfd = accept(_serverSocket, - (struct sockaddr *) &cli_addr, - &clilen); - if (newsockfd > 0) - { - if (_clientSockets.size() < _maxConnections) - { - _clientSockets.push_back(newsockfd); - _logger->log_info("ListenSysLog new client socket %d connection", newsockfd); - continue; - } - else - { - close(newsockfd); - } - } - } - else - { - socklen_t clilen; - struct sockaddr_in cli_addr; - clilen = sizeof(cli_addr); - int recvlen = recvfrom(_serverSocket, _buffer, sizeof(_buffer), 0, - (struct sockaddr *)&cli_addr, &clilen); - if (recvlen > 0 && (recvlen + getEventQueueByteSize()) <= _recvBufSize) - { - uint8_t *payload = new uint8_t[recvlen]; - memcpy(payload, _buffer, recvlen); - putEvent(payload, recvlen); - } - } - } - it = _clientSockets.begin(); - while (it != _clientSockets.end()) - { - int clientSocket = *it; - if (FD_ISSET(clientSocket, &fds)) - { - int recvlen = readline(clientSocket, (char *)_buffer, sizeof(_buffer)); - if (recvlen <= 0) - { - close(clientSocket); - _logger->log_info("ListenSysLog client socket %d close", clientSocket); - it = _clientSockets.erase(it); - } - else - { - if ((recvlen + getEventQueueByteSize()) <= _recvBufSize) - { - uint8_t *payload = new uint8_t[recvlen]; - memcpy(payload, _buffer, recvlen); - putEvent(payload, recvlen); - } - ++it; - } - } - } - } - return; -} - - -int ListenSyslog::readline( int fd, char *bufptr, size_t len ) -{ - char *bufx = bufptr; - static char *bp; - static int cnt = 0; - static char b[ 2048 ]; - char c; - - while ( --len > 0 ) - { - if ( --cnt <= 0 ) - { - cnt = recv( fd, b, sizeof( b ), 0 ); - if ( cnt < 0 ) - { - if ( errno == EINTR ) - { - len++; /* the while will decrement */ - continue; - } - return -1; - } - if ( cnt == 0 ) - return 0; - bp = b; - } - c = *bp++; - *bufptr++ = c; - if ( c == '\n' ) - { - *bufptr = '\n'; - return bufptr - bufx + 1; - } - } - return -1; -} - -void ListenSyslog::onTrigger(ProcessContext *context, ProcessSession *session) -{ - std::string value; - bool needResetServerSocket = false; - if (context->getProperty(Protocol.getName(), value)) - { - if (_protocol != value) - needResetServerSocket = true; - _protocol = value; - } - if (context->getProperty(RecvBufSize.getName(), value)) - { - Property::StringToInt(value, _recvBufSize); - } - if (context->getProperty(MaxSocketBufSize.getName(), value)) - { - Property::StringToInt(value, _maxSocketBufSize); - } - if (context->getProperty(MaxConnections.getName(), value)) - { - Property::StringToInt(value, _maxConnections); - } - if (context->getProperty(MessageDelimiter.getName(), value)) - { - _messageDelimiter = value; - } - if (context->getProperty(ParseMessages.getName(), value)) - { - Property::StringToBool(value, _parseMessages); - } - if (context->getProperty(Port.getName(), value)) - { - int64_t oldPort = _port; - Property::StringToInt(value, _port); - if (_port != oldPort) - needResetServerSocket = true; - } - if (context->getProperty(MaxBatchSize.getName(), value)) - { - Property::StringToInt(value, _maxBatchSize); - } - - if (needResetServerSocket) - _resetServerSocket = true; - - startSocketThread(); - - // read from the event queue - if (isEventQueueEmpty()) - { - context->yield(); - return; - } - - std::queue<SysLogEvent> eventQueue; - pollEvent(eventQueue, _maxBatchSize); - bool firstEvent = true; - FlowFileRecord *flowFile = NULL; - while(!eventQueue.empty()) - { - SysLogEvent event = eventQueue.front(); - eventQueue.pop(); - if (firstEvent) - { - flowFile = session->create(); - if (!flowFile) - return; - ListenSyslog::WriteCallback callback((char *)event.payload, event.len); - session->write(flowFile, &callback); - delete[] event.payload; - firstEvent = false; - } - else - { - ListenSyslog::WriteCallback callback((char *)event.payload, event.len); - session->append(flowFile, &callback); - delete[] event.payload; - } - } - flowFile->addAttribute("syslog.protocol", _protocol); - flowFile->addAttribute("syslog.port", std::to_string(_port)); - session->transfer(flowFile, Success); -}