http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b02af540/src/FlowControlProtocol.cpp
----------------------------------------------------------------------
diff --git a/src/FlowControlProtocol.cpp b/src/FlowControlProtocol.cpp
deleted file mode 100644
index 011ebcf..0000000
--- a/src/FlowControlProtocol.cpp
+++ /dev/null
@@ -1,541 +0,0 @@
-/**
- * @file FlowControlProtocol.cpp
- * FlowControlProtocol class implementation
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#include <sys/time.h>
-#include <stdio.h>
-#include <time.h>
-#include <chrono>
-#include <thread>
-#include <random>
-#include <netinet/tcp.h>
-#include <iostream>
-#include "FlowController.h"
-#include "FlowControlProtocol.h"
-
-int FlowControlProtocol::connectServer(const char *host, uint16_t port)
-{
-       in_addr_t addr;
-       int sock = 0;
-       struct hostent *h;
-#ifdef __MACH__
-       h = gethostbyname(host);
-#else
-       char buf[1024];
-       struct hostent he;
-       int hh_errno;
-       gethostbyname_r(host, &he, buf, sizeof(buf), &h, &hh_errno);
-#endif
-       memcpy((char *) &addr, h->h_addr_list[0], h->h_length);
-       sock = socket(AF_INET, SOCK_STREAM, 0);
-       if (sock < 0)
-       {
-               _logger->log_error("Could not create socket to hostName %s", 
host);
-               return 0;
-       }
-
-#ifndef __MACH__
-       int opt = 1;
-       bool nagle_off = true;
-
-       if (nagle_off)
-       {
-               if (setsockopt(sock, SOL_TCP, TCP_NODELAY, (void *)&opt, 
sizeof(opt)) < 0)
-               {
-                       _logger->log_error("setsockopt() TCP_NODELAY failed");
-                       close(sock);
-                       return 0;
-               }
-               if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR,
-                               (char *)&opt, sizeof(opt)) < 0)
-               {
-                       _logger->log_error("setsockopt() SO_REUSEADDR failed");
-                       close(sock);
-                       return 0;
-               }
-       }
-
-       int sndsize = 256*1024;
-       if (setsockopt(sock, SOL_SOCKET, SO_SNDBUF, (char *)&sndsize, 
(int)sizeof(sndsize)) < 0)
-       {
-               _logger->log_error("setsockopt() SO_SNDBUF failed");
-               close(sock);
-               return 0;
-       }
-#endif
-
-       struct sockaddr_in sa;
-       socklen_t socklen;
-       int status;
-
-       memset(&sa, 0, sizeof(sa));
-       sa.sin_family = AF_INET;
-       sa.sin_addr.s_addr = htonl(INADDR_ANY);
-       sa.sin_port = htons(0);
-       socklen = sizeof(sa);
-       if (bind(sock, (struct sockaddr *)&sa, socklen) < 0)
-       {
-               _logger->log_error("socket bind failed");
-               close(sock);
-               return 0;
-       }
-
-       memset(&sa, 0, sizeof(sa));
-       sa.sin_family = AF_INET;
-       sa.sin_addr.s_addr = addr;
-       sa.sin_port = htons(port);
-       socklen = sizeof(sa);
-
-       status = connect(sock, (struct sockaddr *)&sa, socklen);
-
-       if (status < 0)
-       {
-               _logger->log_error("socket connect failed to %s %d", host, 
port);
-               close(sock);
-               return 0;
-       }
-
-       _logger->log_info("Flow Control Protocol socket %d connect to server %s 
port %d success", sock, host, port);
-
-       return sock;
-}
-
-int FlowControlProtocol::sendData(uint8_t *buf, int buflen)
-{
-       int ret = 0, bytes = 0;
-
-       while (bytes < buflen)
-       {
-               ret = send(_socket, buf+bytes, buflen-bytes, 0);
-               //check for errors
-               if (ret == -1)
-               {
-                       return ret;
-               }
-               bytes+=ret;
-       }
-
-       return bytes;
-}
-
-int FlowControlProtocol::selectClient(int msec)
-{
-       fd_set fds;
-       struct timeval tv;
-    int retval;
-    int fd = _socket;
-
-    FD_ZERO(&fds);
-    FD_SET(fd, &fds);
-
-    tv.tv_sec = msec/1000;
-    tv.tv_usec = (msec % 1000) * 1000;
-
-    if (msec > 0)
-       retval = select(fd+1, &fds, NULL, NULL, &tv);
-    else
-       retval = select(fd+1, &fds, NULL, NULL, NULL);
-
-    if (retval <= 0)
-      return retval;
-    if (FD_ISSET(fd, &fds))
-      return retval;
-    else
-      return 0;
-}
-
-int FlowControlProtocol::readData(uint8_t *buf, int buflen)
-{
-       int sendSize = buflen;
-
-       while (buflen)
-       {
-               int status;
-               status = selectClient(MAX_READ_TIMEOUT);
-               if (status <= 0)
-               {
-                       return status;
-               }
-#ifndef __MACH__
-               status = read(_socket, buf, buflen);
-#else
-               status = recv(_socket, buf, buflen, 0);
-#endif
-               if (status <= 0)
-               {
-                       return status;
-               }
-               buflen -= status;
-               buf += status;
-       }
-
-       return sendSize;
-}
-
-int FlowControlProtocol::readHdr(FlowControlProtocolHeader *hdr)
-{
-       uint8_t buffer[sizeof(FlowControlProtocolHeader)];
-
-       uint8_t *data = buffer;
-
-       int status = readData(buffer, sizeof(FlowControlProtocolHeader));
-       if (status <= 0)
-               return status;
-
-       uint32_t value;
-       data = this->decode(data, value);
-       hdr->msgType = value;
-
-       data = this->decode(data, value);
-       hdr->seqNumber = value;
-
-       data = this->decode(data, value);
-       hdr->status = value;
-
-       data = this->decode(data, value);
-       hdr->payloadLen = value;
-
-       return sizeof(FlowControlProtocolHeader);
-}
-
-void FlowControlProtocol::start()
-{
-       if (_reportInterval <= 0)
-               return;
-       if (_running)
-               return;
-       _running = true;
-       _logger->log_info("FlowControl Protocol Start");
-       _thread = new std::thread(run, this);
-       _thread->detach();
-}
-
-void FlowControlProtocol::stop()
-{
-       if (!_running)
-               return;
-       _running = false;
-       _logger->log_info("FlowControl Protocol Stop");
-}
-
-void FlowControlProtocol::run(FlowControlProtocol *protocol)
-{
-       while (protocol->_running)
-       {
-               
std::this_thread::sleep_for(std::chrono::milliseconds(protocol->_reportInterval));
-               if (!protocol->_registered)
-               {
-                       // if it is not register yet
-                       protocol->sendRegisterReq();
-                       // protocol->_controller->reload("flow.xml");
-               }
-               else
-                       protocol->sendReportReq();
-       }
-       return;
-}
-
-int FlowControlProtocol::sendRegisterReq()
-{
-       if (_registered)
-       {
-               _logger->log_info("Already registered");
-               return -1;
-       }
-
-       uint16_t port = this->_serverPort;
-
-       if (this->_socket <= 0)
-               this->_socket = connectServer(_serverName.c_str(), port);
-
-       if (this->_socket <= 0)
-               return -1;
-
-       // Calculate the total payload msg size
-       uint32_t payloadSize = FlowControlMsgIDEncodingLen(FLOW_SERIAL_NUMBER, 
0) +
-                       FlowControlMsgIDEncodingLen(FLOW_XML_NAME, 
this->_controller->getName().size()+1);
-       uint32_t size = sizeof(FlowControlProtocolHeader) + payloadSize;
-
-       uint8_t *data = new uint8_t[size];
-       uint8_t *start = data;
-
-       // encode the HDR
-       FlowControlProtocolHeader hdr;
-       hdr.msgType = REGISTER_REQ;
-       hdr.payloadLen = payloadSize;
-       hdr.seqNumber  = this->_seqNumber;
-       hdr.status = RESP_SUCCESS;
-       data = this->encode(data, hdr.msgType);
-       data = this->encode(data, hdr.seqNumber);
-       data = this->encode(data, hdr.status);
-       data = this->encode(data, hdr.payloadLen);
-
-       // encode the serial number
-       data = this->encode(data, FLOW_SERIAL_NUMBER);
-       data = this->encode(data, this->_serialNumber, 8);
-
-       // encode the XML name
-       data = this->encode(data, FLOW_XML_NAME);
-       data = this->encode(data, this->_controller->getName());
-
-       // send it
-       int status = sendData(start, size);
-       delete[] start;
-       if (status <= 0)
-       {
-               close(_socket);
-               _socket = 0;
-               _logger->log_error("Flow Control Protocol Send Register Req 
failed");
-               return -1;
-       }
-
-       // Looking for register respond
-       status = readHdr(&hdr);
-
-       if (status <= 0)
-       {
-               close(_socket);
-               _socket = 0;
-               _logger->log_error("Flow Control Protocol Read Register Resp 
header failed");
-               return -1;
-       }
-       _logger->log_info("Flow Control Protocol receive MsgType %s", 
FlowControlMsgTypeToStr((FlowControlMsgType) hdr.msgType));
-       _logger->log_info("Flow Control Protocol receive Seq Num %d", 
hdr.seqNumber);
-       _logger->log_info("Flow Control Protocol receive Resp Code %s", 
FlowControlRespCodeToStr((FlowControlRespCode) hdr.status));
-       _logger->log_info("Flow Control Protocol receive Payload len %d", 
hdr.payloadLen);
-
-       if (hdr.status == RESP_SUCCESS && hdr.seqNumber == this->_seqNumber)
-       {
-               this->_registered = true;
-               this->_seqNumber++;
-               _logger->log_info("Flow Control Protocol Register success");
-               uint8_t *payload = new uint8_t[hdr.payloadLen];
-               uint8_t *payloadPtr = payload;
-               status = readData(payload, hdr.payloadLen);
-               if (status <= 0)
-               {
-                       delete[] payload;
-                       _logger->log_info("Flow Control Protocol Register Read 
Payload fail");
-                       close(_socket);
-                       _socket = 0;
-                       return -1;
-               }
-               while (payloadPtr < (payload + hdr.payloadLen))
-               {
-                       uint32_t msgID;
-                       payloadPtr = this->decode(payloadPtr, msgID);
-                       if (((FlowControlMsgID) msgID) == REPORT_INTERVAL)
-                       {
-                               // Fixed 4 bytes
-                               uint32_t reportInterval;
-                               payloadPtr = this->decode(payloadPtr, 
reportInterval);
-                               _logger->log_info("Flow Control Protocol 
receive report interval %d ms", reportInterval);
-                               this->_reportInterval = reportInterval;
-                       }
-                       else if (((FlowControlMsgID) msgID) == FLOW_XML_CONTENT)
-                       {
-                               uint32_t xmlLen;
-                               payloadPtr = this->decode(payloadPtr, xmlLen);
-                               _logger->log_info("Flow Control Protocol 
receive XML content length %d", xmlLen);
-                               time_t rawtime;
-                               struct tm *timeinfo;
-                               time(&rawtime);
-                               timeinfo = localtime(&rawtime);
-                               std::string xmlFileName = "flow.";
-                               xmlFileName += asctime(timeinfo);
-                               xmlFileName += ".xml";
-                               std::ofstream fs;
-                               fs.open(xmlFileName.c_str(), std::fstream::out 
| std::fstream::binary | std::fstream::trunc);
-                               if (fs.is_open())
-                               {
-                                       fs.write((const char *)payloadPtr, 
xmlLen);
-                                       fs.close();
-                                       
this->_controller->reload(xmlFileName.c_str());
-                               }
-                       }
-                       else
-                       {
-                               break;
-                       }
-               }
-               delete[] payload;
-               close(_socket);
-               _socket = 0;
-               return 0;
-       }
-       else
-       {
-               _logger->log_info("Flow Control Protocol Register fail");
-               close(_socket);
-               _socket = 0;
-               return -1;
-       }
-}
-
-
-int FlowControlProtocol::sendReportReq()
-{
-       uint16_t port = this->_serverPort;
-
-       if (this->_socket <= 0)
-               this->_socket = connectServer(_serverName.c_str(), port);
-
-       if (this->_socket <= 0)
-               return -1;
-
-       // Calculate the total payload msg size
-       uint32_t payloadSize =
-                       FlowControlMsgIDEncodingLen(FLOW_XML_NAME, 
this->_controller->getName().size()+1);
-       uint32_t size = sizeof(FlowControlProtocolHeader) + payloadSize;
-
-       uint8_t *data = new uint8_t[size];
-       uint8_t *start = data;
-
-       // encode the HDR
-       FlowControlProtocolHeader hdr;
-       hdr.msgType = REPORT_REQ;
-       hdr.payloadLen = payloadSize;
-       hdr.seqNumber  = this->_seqNumber;
-       hdr.status = RESP_SUCCESS;
-       data = this->encode(data, hdr.msgType);
-       data = this->encode(data, hdr.seqNumber);
-       data = this->encode(data, hdr.status);
-       data = this->encode(data, hdr.payloadLen);
-
-       // encode the XML name
-       data = this->encode(data, FLOW_XML_NAME);
-       data = this->encode(data, this->_controller->getName());
-
-       // send it
-       int status = sendData(start, size);
-       delete[] start;
-       if (status <= 0)
-       {
-               close(_socket);
-               _socket = 0;
-               _logger->log_error("Flow Control Protocol Send Report Req 
failed");
-               return -1;
-       }
-
-       // Looking for report respond
-       status = readHdr(&hdr);
-
-       if (status <= 0)
-       {
-               close(_socket);
-               _socket = 0;
-               _logger->log_error("Flow Control Protocol Read Report Resp 
header failed");
-               return -1;
-       }
-       _logger->log_info("Flow Control Protocol receive MsgType %s", 
FlowControlMsgTypeToStr((FlowControlMsgType) hdr.msgType));
-       _logger->log_info("Flow Control Protocol receive Seq Num %d", 
hdr.seqNumber);
-       _logger->log_info("Flow Control Protocol receive Resp Code %s", 
FlowControlRespCodeToStr((FlowControlRespCode) hdr.status));
-       _logger->log_info("Flow Control Protocol receive Payload len %d", 
hdr.payloadLen);
-
-       if (hdr.status == RESP_SUCCESS && hdr.seqNumber == this->_seqNumber)
-       {
-               this->_seqNumber++;
-               uint8_t *payload = new uint8_t[hdr.payloadLen];
-               uint8_t *payloadPtr = payload;
-               status = readData(payload, hdr.payloadLen);
-               if (status <= 0)
-               {
-                       delete[] payload;
-                       _logger->log_info("Flow Control Protocol Report Resp 
Read Payload fail");
-                       close(_socket);
-                       _socket = 0;
-                       return -1;
-               }
-               std::string processor;
-               std::string propertyName;
-               std::string propertyValue;
-               while (payloadPtr < (payload + hdr.payloadLen))
-               {
-                       uint32_t msgID;
-                       payloadPtr = this->decode(payloadPtr, msgID);
-                       if (((FlowControlMsgID) msgID) == PROCESSOR_NAME)
-                       {
-                               uint32_t len;
-                               payloadPtr = this->decode(payloadPtr, len);
-                               processor = (const char *) payloadPtr;
-                               payloadPtr += len;
-                               _logger->log_info("Flow Control Protocol 
receive report resp processor %s", processor.c_str());
-                       }
-                       else if (((FlowControlMsgID) msgID) == PROPERTY_NAME)
-                       {
-                               uint32_t len;
-                               payloadPtr = this->decode(payloadPtr, len);
-                               propertyName = (const char *) payloadPtr;
-                               payloadPtr += len;
-                               _logger->log_info("Flow Control Protocol 
receive report resp property name %s", propertyName.c_str());
-                       }
-                       else if (((FlowControlMsgID) msgID) == PROPERTY_VALUE)
-                       {
-                               uint32_t len;
-                               payloadPtr = this->decode(payloadPtr, len);
-                               propertyValue = (const char *) payloadPtr;
-                               payloadPtr += len;
-                               _logger->log_info("Flow Control Protocol 
receive report resp property value %s", propertyValue.c_str());
-                               
this->_controller->updatePropertyValue(processor, propertyName, propertyValue);
-                       }
-                       else
-                       {
-                               break;
-                       }
-               }
-               delete[] payload;
-               close(_socket);
-               _socket = 0;
-               return 0;
-       }
-       else if (hdr.status == RESP_TRIGGER_REGISTER && hdr.seqNumber == 
this->_seqNumber)
-       {
-               _logger->log_info("Flow Control Protocol trigger reregister");
-               this->_registered = false;
-               this->_seqNumber++;
-               close(_socket);
-               _socket = 0;
-               return 0;
-       }
-       else if (hdr.status == RESP_STOP_FLOW_CONTROLLER && hdr.seqNumber == 
this->_seqNumber)
-       {
-               _logger->log_info("Flow Control Protocol stop flow controller");
-               this->_controller->stop(true);
-               this->_seqNumber++;
-               close(_socket);
-               _socket = 0;
-               return 0;
-       }
-       else if (hdr.status == RESP_START_FLOW_CONTROLLER && hdr.seqNumber == 
this->_seqNumber)
-       {
-               _logger->log_info("Flow Control Protocol start flow 
controller");
-               this->_controller->start();
-               this->_seqNumber++;
-               close(_socket);
-               _socket = 0;
-               return 0;
-       }
-       else
-       {
-               _logger->log_info("Flow Control Protocol Report fail");
-               close(_socket);
-               _socket = 0;
-               return -1;
-       }
-}
-

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b02af540/src/FlowController.cpp
----------------------------------------------------------------------
diff --git a/src/FlowController.cpp b/src/FlowController.cpp
deleted file mode 100644
index 8fbe3dc..0000000
--- a/src/FlowController.cpp
+++ /dev/null
@@ -1,1190 +0,0 @@
-/**
- * @file FlowController.cpp
- * FlowController class implementation
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#include <vector>
-#include <queue>
-#include <map>
-#include <set>
-#include <sys/time.h>
-#include <time.h>
-#include <chrono>
-#include <thread>
-#include <libxml/parser.h>
-#include <libxml/tree.h>
-
-#include "FlowController.h"
-#include "ProcessContext.h"
-
-FlowController::FlowController(std::string name)
-: _name(name)
-{
-       uuid_generate(_uuid);
-
-       // Setup the default values
-       _configurationFileName = DEFAULT_FLOW_YAML_FILE_NAME;
-       _maxEventDrivenThreads = DEFAULT_MAX_EVENT_DRIVEN_THREAD;
-       _maxTimerDrivenThreads = DEFAULT_MAX_TIMER_DRIVEN_THREAD;
-       _running = false;
-       _initialized = false;
-       _root = NULL;
-       _logger = Logger::getLogger();
-       _protocol = new FlowControlProtocol(this);
-
-       // NiFi config properties
-       _configure = Configure::getConfigure();
-
-       std::string rawConfigFileString;
-       _configure->get(Configure::nifi_flow_configuration_file, 
rawConfigFileString);
-
-       if (!rawConfigFileString.empty())
-       {
-               _configurationFileName = rawConfigFileString;
-       }
-
-       char *path = NULL;
-       char full_path[PATH_MAX];
-
-       std::string adjustedFilename;
-       if (!_configurationFileName.empty())
-       {
-               // perform a naive determination if this is a relative path
-               if (_configurationFileName.c_str()[0] != '/')
-               {
-                       adjustedFilename = adjustedFilename + 
_configure->getHome() + "/" + _configurationFileName;
-               }
-               else
-               {
-                       adjustedFilename = _configurationFileName;
-               }
-       }
-
-       path = realpath(adjustedFilename.c_str(), full_path);
-       if (!path)
-       {
-               _logger->log_error("Could not locate path from provided 
configuration file name.");
-       }
-
-       std::string pathString(path);
-       _configurationFileName = pathString;
-       _logger->log_info("FlowController NiFi Configuration file %s", 
pathString.c_str());
-
-       // Create repos for flow record and provenance
-
-       _logger->log_info("FlowController %s created", _name.c_str());
-}
-
-FlowController::~FlowController()
-{
-       stop(true);
-       unload();
-       delete _protocol;
-}
-
-bool FlowController::isRunning()
-{
-       return (_running);
-}
-
-bool FlowController::isInitialized()
-{
-       return (_initialized);
-}
-
-void FlowController::stop(bool force)
-{
-       if (_running)
-       {
-               _logger->log_info("Stop Flow Controller");
-               this->_timerScheduler.stop();
-               // Wait for sometime for thread stop
-               std::this_thread::sleep_for(std::chrono::milliseconds(1000));
-               if (this->_root)
-                       this->_root->stopProcessing(&this->_timerScheduler);
-               _running = false;
-       }
-}
-
-void FlowController::unload()
-{
-       if (_running)
-       {
-               stop(true);
-       }
-       if (_initialized)
-       {
-               _logger->log_info("Unload Flow Controller");
-               if (_root)
-                       delete _root;
-               _root = NULL;
-               _initialized = false;
-               _name = "";
-       }
-
-       return;
-}
-
-void FlowController::reload(std::string xmlFile)
-{
-       _logger->log_info("Starting to reload Flow Controller with xml %s", 
xmlFile.c_str());
-       stop(true);
-       unload();
-       std::string oldxmlFile = this->_configurationFileName;
-       this->_configurationFileName = xmlFile;
-       load(ConfigFormat::XML);
-       start();
-       if (!this->_root)
-       {
-               this->_configurationFileName = oldxmlFile;
-               _logger->log_info("Rollback Flow Controller to xml %s", 
oldxmlFile.c_str());
-               stop(true);
-               unload();
-               load(ConfigFormat::XML);
-               start();
-       }
-}
-
-Processor *FlowController::createProcessor(std::string name, uuid_t uuid)
-{
-       Processor *processor = NULL;
-       if (name == GenerateFlowFile::ProcessorName)
-       {
-               processor = new GenerateFlowFile(name, uuid);
-       }
-       else if (name == LogAttribute::ProcessorName)
-       {
-               processor = new LogAttribute(name, uuid);
-       }
-       else if (name == RealTimeDataCollector::ProcessorName)
-       {
-               processor = new RealTimeDataCollector(name, uuid);
-       }
-       else if (name == GetFile::ProcessorName)
-       {
-               processor = new GetFile(name, uuid);
-       }
-       else if (name == TailFile::ProcessorName)
-       {
-               processor = new TailFile(name, uuid);
-       }
-       else if (name == ListenSyslog::ProcessorName)
-       {
-               processor = new ListenSyslog(name, uuid);
-       }
-       else if (name == ExecuteProcess::ProcessorName)
-       {
-               processor = new ExecuteProcess(name, uuid);
-       }
-       else
-       {
-               _logger->log_error("No Processor defined for %s", name.c_str());
-               return NULL;
-       }
-
-       //! initialize the processor
-       processor->initialize();
-
-       return processor;
-}
-
-ProcessGroup *FlowController::createRootProcessGroup(std::string name, uuid_t 
uuid)
-{
-       return new ProcessGroup(ROOT_PROCESS_GROUP, name, uuid);
-}
-
-ProcessGroup *FlowController::createRemoteProcessGroup(std::string name, 
uuid_t uuid)
-{
-       return new ProcessGroup(REMOTE_PROCESS_GROUP, name, uuid);
-}
-
-Connection *FlowController::createConnection(std::string name, uuid_t uuid)
-{
-       return new Connection(name, uuid);
-}
-
-void FlowController::parseConnection(xmlDoc *doc, xmlNode *node, ProcessGroup 
*parent)
-{
-       uuid_t uuid;
-       xmlNode *currentNode;
-       Connection *connection = NULL;
-
-       if (!parent)
-       {
-               _logger->log_error("parseProcessNode: no parent group existed");
-               return;
-       }
-
-       // generate the random UIID
-       uuid_generate(uuid);
-
-       for (currentNode = node->xmlChildrenNode; currentNode != NULL; 
currentNode = currentNode->next)
-       {
-               if (currentNode->type == XML_ELEMENT_NODE)
-               {
-                       if (xmlStrcmp(currentNode->name, BAD_CAST "id") == 0)
-                       {
-                               char *id = (char *) 
xmlNodeGetContent(currentNode);
-                               if (id) {
-                                       _logger->log_debug("parseConnection: id 
=> [%s]", id);
-                                       uuid_parse(id, uuid);
-                                       xmlFree(id);
-                               }
-                       } else if (xmlStrcmp(currentNode->name, BAD_CAST 
"name") == 0) {
-                               char *name = (char *) 
xmlNodeGetContent(currentNode);
-                               if (name) {
-                                       _logger->log_debug("parseConnection: 
name => [%s]", name);
-                                       connection = 
this->createConnection(name, uuid);
-                                       if (connection == NULL) {
-                                               xmlFree(name);
-                                               return;
-                                       }
-                                       xmlFree(name);
-                               }
-                       } else if (xmlStrcmp(currentNode->name, BAD_CAST 
"sourceId") == 0) {
-                               char *id = (char *) 
xmlNodeGetContent(currentNode);
-                               if (id) {
-                                       _logger->log_debug("parseConnection: 
sourceId => [%s]", id);
-                                       uuid_parse(id, uuid);
-                                       xmlFree(id);
-                                       if (connection)
-                                               
connection->setSourceProcessorUUID(uuid);
-                               }
-                       } else if (xmlStrcmp(currentNode->name, BAD_CAST 
"destinationId") == 0) {
-                               char *id = (char *) 
xmlNodeGetContent(currentNode);
-                               if (id) {
-                                       _logger->log_debug("parseConnection: 
destinationId => [%s]", id);
-                                       uuid_parse(id, uuid);
-                                       xmlFree(id);
-                                       if (connection)
-                                               
connection->setDestinationProcessorUUID(uuid);
-                               }
-                       } else if (xmlStrcmp(currentNode->name, BAD_CAST 
"maxWorkQueueSize") == 0) {
-                               char *temp = (char *) 
xmlNodeGetContent(currentNode);
-                               int64_t maxWorkQueueSize = 0;
-                               if (temp) {
-                                       if (Property::StringToInt(temp, 
maxWorkQueueSize)) {
-                                               
_logger->log_debug("parseConnection: maxWorkQueueSize => [%d]", 
maxWorkQueueSize);
-                                               if (connection)
-                                                       
connection->setMaxQueueSize(maxWorkQueueSize);
-
-                                       }
-                                       xmlFree(temp);
-                               }
-                       } else if (xmlStrcmp(currentNode->name, BAD_CAST 
"maxWorkQueueDataSize") == 0) {
-                               char *temp = (char *) 
xmlNodeGetContent(currentNode);
-                               int64_t maxWorkQueueDataSize = 0;
-                               if (temp) {
-                                       if (Property::StringToInt(temp, 
maxWorkQueueDataSize)) {
-                                               
_logger->log_debug("parseConnection: maxWorkQueueDataSize => [%d]", 
maxWorkQueueDataSize);
-                                               if (connection)
-                                                       
connection->setMaxQueueDataSize(maxWorkQueueDataSize);
-
-                                       }
-                                       xmlFree(temp);
-                               }
-                       } else if (xmlStrcmp(currentNode->name, BAD_CAST 
"relationship") == 0) {
-                               char *temp = (char *) 
xmlNodeGetContent(currentNode);
-                               if (temp) {
-                                       std::string relationshipName = temp;
-                                       if (!relationshipName.empty()) {
-                                               Relationship 
relationship(relationshipName, "");
-                                               
_logger->log_debug("parseConnection: relationship => [%s]", 
relationshipName.c_str());
-                                               if (connection)
-                                                       
connection->setRelationship(relationship);
-                                       } else {
-                                               Relationship empty;
-                                               
_logger->log_debug("parseConnection: relationship => [%s]", 
empty.getName().c_str());
-                                               if (connection)
-                                                       
connection->setRelationship(empty);
-                                       }
-                                       xmlFree(temp);
-                               }
-                       }
-               } // if (currentNode->type == XML_ELEMENT_NODE)
-       } // for node
-
-       if (connection)
-               parent->addConnection(connection);
-
-       return;
-}
-
-void FlowController::parseRootProcessGroup(xmlDoc *doc, xmlNode *node) {
-       uuid_t uuid;
-       xmlNode *currentNode;
-       ProcessGroup *group = NULL;
-
-       // generate the random UIID
-       uuid_generate(uuid);
-
-       for (currentNode = node->xmlChildrenNode; currentNode != NULL; 
currentNode = currentNode->next) {
-               if (currentNode->type == XML_ELEMENT_NODE) {
-                       if (xmlStrcmp(currentNode->name, BAD_CAST "id") == 0) {
-                               char *id = (char *) 
xmlNodeGetContent(currentNode);
-                               if (id) {
-                                       
_logger->log_debug("parseRootProcessGroup: id => [%s]", id);
-                                       uuid_parse(id, uuid);
-                                       xmlFree(id);
-                               }
-                       } else if (xmlStrcmp(currentNode->name, BAD_CAST 
"name") == 0) {
-                               char *name = (char *) 
xmlNodeGetContent(currentNode);
-                               if (name) {
-                                       
_logger->log_debug("parseRootProcessGroup: name => [%s]", name);
-                                       group = 
this->createRootProcessGroup(name, uuid);
-                                       if (group == NULL) {
-                                               xmlFree(name);
-                                               return;
-                                       }
-                                       // Set the root process group
-                                       this->_root = group;
-                                       this->_name = name;
-                                       xmlFree(name);
-                               }
-                       } else if (xmlStrcmp(currentNode->name, BAD_CAST 
"processor") == 0) {
-                               this->parseProcessorNode(doc, currentNode, 
group);
-                       } else if (xmlStrcmp(currentNode->name, BAD_CAST 
"connection") == 0) {
-                               this->parseConnection(doc, currentNode, group);
-                       } else if (xmlStrcmp(currentNode->name, BAD_CAST 
"remoteProcessGroup") == 0) {
-                               this->parseRemoteProcessGroup(doc, currentNode, 
group);
-                       }
-               } // if (currentNode->type == XML_ELEMENT_NODE)
-       } // for node
-}
-
-void FlowController::parseRootProcessGroupYaml(YAML::Node rootFlowNode) {
-       uuid_t uuid;
-       ProcessGroup *group = NULL;
-
-       // generate the random UIID
-       uuid_generate(uuid);
-
-       std::string flowName = rootFlowNode["name"].as<std::string>();
-
-       char uuidStr[37];
-       uuid_unparse(_uuid, uuidStr);
-       _logger->log_debug("parseRootProcessGroup: id => [%s]", uuidStr);
-       _logger->log_debug("parseRootProcessGroup: name => [%s]", 
flowName.c_str());
-       group = this->createRootProcessGroup(flowName, uuid);
-       this->_root = group;
-       this->_name = flowName;
-}
-
-void FlowController::parseProcessorNodeYaml(YAML::Node processorsNode, 
ProcessGroup *parentGroup) {
-       int64_t schedulingPeriod = -1;
-       int64_t penalizationPeriod = -1;
-       int64_t yieldPeriod = -1;
-       int64_t runDurationNanos = -1;
-       uuid_t uuid;
-       Processor *processor = NULL;
-
-       if (!parentGroup) {
-               _logger->log_error("parseProcessNodeYaml: no parent group 
exists");
-               return;
-       }
-
-       if (processorsNode) {
-               // Evaluate sequence of processors
-               int numProcessors = processorsNode.size();
-               if (numProcessors < 1) {
-                       throw new std::invalid_argument("There must be at least 
one processor configured.");
-               }
-
-               std::vector<ProcessorConfig> processorConfigs;
-
-               if (processorsNode.IsSequence()) {
-                       for (YAML::const_iterator iter = 
processorsNode.begin(); iter != processorsNode.end(); ++iter) {
-                               ProcessorConfig procCfg;
-                               YAML::Node procNode = iter->as<YAML::Node>();
-
-                               procCfg.name = 
procNode["name"].as<std::string>();
-                               _logger->log_debug("parseProcessorNode: name => 
[%s]", procCfg.name.c_str());
-                               procCfg.javaClass = 
procNode["class"].as<std::string>();
-                               _logger->log_debug("parseProcessorNode: class 
=> [%s]", procCfg.javaClass.c_str());
-
-                               char uuidStr[37];
-                               uuid_unparse(_uuid, uuidStr);
-
-                               // generate the random UUID
-                               uuid_generate(uuid);
-
-                               // Determine the processor name only from the 
Java class
-                               int lastOfIdx = 
procCfg.javaClass.find_last_of(".");
-                               if (lastOfIdx != std::string::npos) {
-                                       lastOfIdx++; // if a value is found, 
increment to move beyond the .
-                                       int nameLength = 
procCfg.javaClass.length() - lastOfIdx;
-                                       std::string processorName = 
procCfg.javaClass.substr(lastOfIdx, nameLength);
-                                       processor = 
this->createProcessor(processorName, uuid);
-                               }
-
-                               if (!processor) {
-                                       _logger->log_error("Could not create a 
processor %s with name %s", procCfg.name.c_str(), uuidStr);
-                                       throw std::invalid_argument("Could not 
create processor " + procCfg.name);
-                               }
-                               processor->setName(procCfg.name);
-
-                               procCfg.maxConcurrentTasks = procNode["max 
concurrent tasks"].as<std::string>();
-                               _logger->log_debug("parseProcessorNode: max 
concurrent tasks => [%s]", procCfg.maxConcurrentTasks.c_str());
-                               procCfg.schedulingStrategy = 
procNode["scheduling strategy"].as<std::string>();
-                               _logger->log_debug("parseProcessorNode: 
scheduling strategy => [%s]",
-                                               
procCfg.schedulingStrategy.c_str());
-                               procCfg.schedulingPeriod = procNode["scheduling 
period"].as<std::string>();
-                               _logger->log_debug("parseProcessorNode: 
scheduling period => [%s]", procCfg.schedulingPeriod.c_str());
-                               procCfg.penalizationPeriod = 
procNode["penalization period"].as<std::string>();
-                               _logger->log_debug("parseProcessorNode: 
penalization period => [%s]",
-                                               
procCfg.penalizationPeriod.c_str());
-                               procCfg.yieldPeriod = procNode["yield 
period"].as<std::string>();
-                               _logger->log_debug("parseProcessorNode: yield 
period => [%s]", procCfg.yieldPeriod.c_str());
-                               procCfg.yieldPeriod = procNode["run duration 
nanos"].as<std::string>();
-                               _logger->log_debug("parseProcessorNode: run 
duration nanos => [%s]", procCfg.runDurationNanos.c_str());
-
-                               // handle auto-terminated relationships
-                               YAML::Node autoTerminatedSequence = 
procNode["auto-terminated relationships list"];
-                               std::vector<std::string> 
rawAutoTerminatedRelationshipValues;
-                               if (autoTerminatedSequence.IsSequence() && 
!autoTerminatedSequence.IsNull()
-                                               && 
autoTerminatedSequence.size() > 0) {
-                                       for (YAML::const_iterator relIter = 
autoTerminatedSequence.begin();
-                                                       relIter != 
autoTerminatedSequence.end(); ++relIter) {
-                                               std::string autoTerminatedRel = 
relIter->as<std::string>();
-                                               
rawAutoTerminatedRelationshipValues.push_back(autoTerminatedRel);
-                                       }
-                               }
-                               procCfg.autoTerminatedRelationships = 
rawAutoTerminatedRelationshipValues;
-
-                               // handle processor properties
-                               YAML::Node propertiesNode = 
procNode["Properties"];
-                               parsePropertiesNodeYaml(&propertiesNode, 
processor);
-
-                               // Take care of scheduling
-                               TimeUnit unit;
-                               if 
(Property::StringToTime(procCfg.schedulingPeriod, schedulingPeriod, unit)
-                                               && 
Property::ConvertTimeUnitToNS(schedulingPeriod, unit, schedulingPeriod)) {
-                                       _logger->log_debug("convert: 
parseProcessorNode: schedulingPeriod => [%d] ns", schedulingPeriod);
-                                       
processor->setSchedulingPeriodNano(schedulingPeriod);
-                               }
-
-                               if 
(Property::StringToTime(procCfg.penalizationPeriod, penalizationPeriod, unit)
-                                               && 
Property::ConvertTimeUnitToMS(penalizationPeriod, unit, penalizationPeriod)) {
-                                       _logger->log_debug("convert: 
parseProcessorNode: penalizationPeriod => [%d] ms",
-                                                       penalizationPeriod);
-                                       
processor->setPenalizationPeriodMsec(penalizationPeriod);
-                               }
-
-                               if (Property::StringToTime(procCfg.yieldPeriod, 
yieldPeriod, unit)
-                                               && 
Property::ConvertTimeUnitToMS(yieldPeriod, unit, yieldPeriod)) {
-                                       _logger->log_debug("convert: 
parseProcessorNode: yieldPeriod => [%d] ms", yieldPeriod);
-                                       
processor->setYieldPeriodMsec(yieldPeriod);
-                               }
-
-                               // Default to running
-                               processor->setScheduledState(RUNNING);
-
-                               if (procCfg.schedulingStrategy == 
"TIMER_DRIVEN") {
-                                       
processor->setSchedulingStrategy(TIMER_DRIVEN);
-                                       _logger->log_debug("setting scheduling 
strategy as %s", procCfg.schedulingStrategy.c_str());
-                               } else if (procCfg.schedulingStrategy == 
"EVENT_DRIVEN") {
-                                       
processor->setSchedulingStrategy(EVENT_DRIVEN);
-                                       _logger->log_debug("setting scheduling 
strategy as %s", procCfg.schedulingStrategy.c_str());
-                               } else {
-                                       
processor->setSchedulingStrategy(CRON_DRIVEN);
-                                       _logger->log_debug("setting scheduling 
strategy as %s", procCfg.schedulingStrategy.c_str());
-
-                               }
-
-                               int64_t maxConcurrentTasks;
-                               if 
(Property::StringToInt(procCfg.maxConcurrentTasks, maxConcurrentTasks)) {
-                                       _logger->log_debug("parseProcessorNode: 
maxConcurrentTasks => [%d]", maxConcurrentTasks);
-                                       
processor->setMaxConcurrentTasks(maxConcurrentTasks);
-                               }
-
-                               if 
(Property::StringToInt(procCfg.runDurationNanos, runDurationNanos)) {
-                                       _logger->log_debug("parseProcessorNode: 
runDurationNanos => [%d]", runDurationNanos);
-                                       
processor->setRunDurationNano(runDurationNanos);
-                               }
-
-                               std::set<Relationship> 
autoTerminatedRelationships;
-                               for (auto&& relString : 
procCfg.autoTerminatedRelationships) {
-                                       Relationship relationship(relString, 
"");
-                                       _logger->log_debug("parseProcessorNode: 
autoTerminatedRelationship  => [%s]", relString.c_str());
-                                       
autoTerminatedRelationships.insert(relationship);
-                               }
-
-                               
processor->setAutoTerminatedRelationships(autoTerminatedRelationships);
-
-                               parentGroup->addProcessor(processor);
-                       }
-               }
-       } else {
-               throw new std::invalid_argument(
-                               "Cannot instantiate a MiNiFi instance without a 
defined Processors configuration node.");
-       }
-}
-
-void FlowController::parseRemoteProcessGroupYaml(YAML::Node *rpgNode, 
ProcessGroup *parentGroup) {
-       uuid_t uuid;
-
-       if (!parentGroup) {
-               _logger->log_error("parseRemoteProcessGroupYaml: no parent 
group exists");
-               return;
-       }
-
-       if (rpgNode) {
-               if (rpgNode->IsSequence()) {
-                       for (YAML::const_iterator iter = rpgNode->begin(); iter 
!= rpgNode->end(); ++iter) {
-                               YAML::Node rpgNode = iter->as<YAML::Node>();
-
-                               auto name = rpgNode["name"].as<std::string>();
-                               
_logger->log_debug("parseRemoteProcessGroupYaml: name => [%s]", name.c_str());
-
-                               std::string url = 
rpgNode["url"].as<std::string>();
-                               
_logger->log_debug("parseRemoteProcessGroupYaml: url => [%s]", url.c_str());
-
-                               std::string timeout = 
rpgNode["timeout"].as<std::string>();
-                               
_logger->log_debug("parseRemoteProcessGroupYaml: timeout => [%s]", 
timeout.c_str());
-
-                               std::string yieldPeriod = rpgNode["yield 
period"].as<std::string>();
-                               
_logger->log_debug("parseRemoteProcessGroupYaml: yield period => [%s]", 
yieldPeriod.c_str());
-
-                               YAML::Node inputPorts = rpgNode["Input 
Ports"].as<YAML::Node>();
-                               ProcessGroup* group = NULL;
-
-                               // generate the random UUID
-                               uuid_generate(uuid);
-
-                               char uuidStr[37];
-                               uuid_unparse(_uuid, uuidStr);
-
-                               int64_t timeoutValue = -1;
-                               int64_t yieldPeriodValue = -1;
-
-                               group = 
this->createRemoteProcessGroup(name.c_str(), uuid);
-                               group->setParent(parentGroup);
-                               parentGroup->addProcessGroup(group);
-
-                               TimeUnit unit;
-
-                               if (Property::StringToTime(yieldPeriod, 
yieldPeriodValue, unit)
-                                                       && 
Property::ConvertTimeUnitToMS(yieldPeriodValue, unit, yieldPeriodValue) && 
group) {
-                                       
_logger->log_debug("parseRemoteProcessGroupYaml: yieldPeriod => [%d] ms", 
yieldPeriodValue);
-                                       
group->setYieldPeriodMsec(yieldPeriodValue);
-                               }
-
-                               if (Property::StringToTime(timeout, 
timeoutValue, unit)
-                                       && 
Property::ConvertTimeUnitToMS(timeoutValue, unit, timeoutValue) && group) {
-                                       
_logger->log_debug("parseRemoteProcessGroupYaml: timeoutValue => [%d] ms", 
timeoutValue);
-                                       group->setTimeOut(timeoutValue);
-                               }
-
-                               group->setTransmitting(true);
-                               group->setURL(url);
-
-                               if (inputPorts.IsSequence()) {
-                                       for (YAML::const_iterator portIter = 
inputPorts.begin(); portIter != inputPorts.end(); ++portIter) {
-                                               _logger->log_debug("Got a 
current port, iterating...");
-
-                                               YAML::Node currPort = 
portIter->as<YAML::Node>();
-
-                                               this->parsePortYaml(&currPort, 
group, SEND);
-                                       } // for node
-                               }
-                       }
-               }
-       }
-}
-
-void FlowController::parseConnectionYaml(YAML::Node *connectionsNode, 
ProcessGroup *parent) {
-       uuid_t uuid;
-       Connection *connection = NULL;
-
-       if (!parent) {
-               _logger->log_error("parseProcessNode: no parent group was 
provided");
-               return;
-       }
-
-       if (connectionsNode) {
-               int numConnections = connectionsNode->size();
-               if (numConnections < 1) {
-                       throw new std::invalid_argument("There must be at least 
one connection configured.");
-               }
-
-               if (connectionsNode->IsSequence()) {
-                       for (YAML::const_iterator iter = 
connectionsNode->begin(); iter != connectionsNode->end(); ++iter) {
-                               // generate the random UIID
-                               uuid_generate(uuid);
-
-                               YAML::Node connectionNode = 
iter->as<YAML::Node>();
-
-                               std::string name = 
connectionNode["name"].as<std::string>();
-                               std::string destName = 
connectionNode["destination name"].as<std::string>();
-
-                               char uuidStr[37];
-                               uuid_unparse(_uuid, uuidStr);
-
-                               _logger->log_debug("Created connection with 
UUID %s and name %s", uuidStr, name.c_str());
-                               connection = this->createConnection(name, uuid);
-                               auto rawRelationship = connectionNode["source 
relationship name"].as<std::string>();
-                               Relationship relationship(rawRelationship, "");
-                               _logger->log_debug("parseConnection: 
relationship => [%s]", rawRelationship.c_str());
-                               if (connection)
-                                       
connection->setRelationship(relationship);
-                               std::string connectionSrcProcName = 
connectionNode["source name"].as<std::string>();
-
-                               Processor *srcProcessor = 
this->_root->findProcessor(connectionSrcProcName);
-
-                               if (!srcProcessor) {
-                                       _logger->log_error("Could not locate a 
source with name %s to create a connection",
-                                                       
connectionSrcProcName.c_str());
-                                       throw std::invalid_argument(
-                                                       "Could not locate a 
source with name %s to create a connection " + connectionSrcProcName);
-                               }
-
-                               Processor *destProcessor = 
this->_root->findProcessor(destName);
-                               // If we could not find name, try by UUID
-                               if (!destProcessor) {
-                                       uuid_t destUuid;
-                                       uuid_parse(destName.c_str(), destUuid);
-                                       destProcessor = 
this->_root->findProcessor(destUuid);
-                               }
-                               if (destProcessor) {
-                                       std::string destUuid = 
destProcessor->getUUIDStr();
-                               }
-
-                               uuid_t srcUuid;
-                               uuid_t destUuid;
-                               srcProcessor->getUUID(srcUuid);
-                               connection->setSourceProcessorUUID(srcUuid);
-                               destProcessor->getUUID(destUuid);
-                               
connection->setDestinationProcessorUUID(destUuid);
-
-                               if (connection) {
-                                       parent->addConnection(connection);
-                               }
-                       }
-               }
-
-               if (connection)
-                       parent->addConnection(connection);
-
-               return;
-       }
-}
-
-void FlowController::parseRemoteProcessGroup(xmlDoc *doc, xmlNode *node, 
ProcessGroup *parent) {
-       uuid_t uuid;
-       xmlNode *currentNode;
-       ProcessGroup *group = NULL;
-       int64_t yieldPeriod = -1;
-       int64_t timeOut = -1;
-
-// generate the random UIID
-       uuid_generate(uuid);
-
-       for (currentNode = node->xmlChildrenNode; currentNode != NULL; 
currentNode = currentNode->next) {
-               if (currentNode->type == XML_ELEMENT_NODE) {
-                       if (xmlStrcmp(currentNode->name, BAD_CAST "id") == 0) {
-                               char *id = (char *) 
xmlNodeGetContent(currentNode);
-                               if (id) {
-                                       
_logger->log_debug("parseRootProcessGroup: id => [%s]", id);
-                                       uuid_parse(id, uuid);
-                                       xmlFree(id);
-                               }
-                       } else if (xmlStrcmp(currentNode->name, BAD_CAST 
"name") == 0) {
-                               char *name = (char *) 
xmlNodeGetContent(currentNode);
-                               if (name) {
-                                       
_logger->log_debug("parseRemoteProcessGroup: name => [%s]", name);
-                                       group = 
this->createRemoteProcessGroup(name, uuid);
-                                       if (group == NULL) {
-                                               xmlFree(name);
-                                               return;
-                                       }
-                                       group->setParent(parent);
-                                       parent->addProcessGroup(group);
-                                       xmlFree(name);
-                               }
-                       } else if (xmlStrcmp(currentNode->name, BAD_CAST 
"yieldPeriod") == 0) {
-                               TimeUnit unit;
-                               char *temp = (char *) 
xmlNodeGetContent(currentNode);
-                               if (temp) {
-                                       if (Property::StringToTime(temp, 
yieldPeriod, unit)
-                                                       && 
Property::ConvertTimeUnitToMS(yieldPeriod, unit, yieldPeriod) && group) {
-                                               
_logger->log_debug("parseRemoteProcessGroup: yieldPeriod => [%d] ms", 
yieldPeriod);
-                                               
group->setYieldPeriodMsec(yieldPeriod);
-                                       }
-                                       xmlFree(temp);
-                               }
-                       } else if (xmlStrcmp(currentNode->name, BAD_CAST 
"timeout") == 0) {
-                               TimeUnit unit;
-                               char *temp = (char *) 
xmlNodeGetContent(currentNode);
-                               if (temp) {
-                                       if (Property::StringToTime(temp, 
timeOut, unit)
-                                                       && 
Property::ConvertTimeUnitToMS(timeOut, unit, timeOut) && group) {
-                                               
_logger->log_debug("parseRemoteProcessGroup: timeOut => [%d] ms", timeOut);
-                                               group->setTimeOut(timeOut);
-                                       }
-                                       xmlFree(temp);
-                               }
-                       } else if (xmlStrcmp(currentNode->name, BAD_CAST 
"transmitting") == 0) {
-                               char *temp = (char *) 
xmlNodeGetContent(currentNode);
-                               bool transmitting;
-                               if (temp) {
-                                       if (Property::StringToBool(temp, 
transmitting) && group) {
-                                               
_logger->log_debug("parseRemoteProcessGroup: transmitting => [%d]", 
transmitting);
-                                               
group->setTransmitting(transmitting);
-                                       }
-                                       xmlFree(temp);
-                               }
-                       } else if (xmlStrcmp(currentNode->name, BAD_CAST 
"inputPort") == 0 && group) {
-                               this->parsePort(doc, currentNode, group, SEND);
-                       } else if (xmlStrcmp(currentNode->name, BAD_CAST 
"outputPort") == 0 && group) {
-                               this->parsePort(doc, currentNode, group, 
RECEIVE);
-                       }
-               } // if (currentNode->type == XML_ELEMENT_NODE)
-       } // for node
-}
-
-void FlowController::parseProcessorProperty(xmlDoc *doc, xmlNode *node, 
Processor *processor) {
-       xmlNode *currentNode;
-       std::string propertyValue;
-       std::string propertyName;
-
-       if (!processor) {
-               _logger->log_error("parseProcessorProperty: no parent processor 
existed");
-               return;
-       }
-
-       for (currentNode = node->xmlChildrenNode; currentNode != NULL; 
currentNode = currentNode->next) {
-               if (currentNode->type == XML_ELEMENT_NODE) {
-                       if (xmlStrcmp(currentNode->name, BAD_CAST "name") == 0) 
{
-                               char *name = (char *) 
xmlNodeGetContent(currentNode);
-                               if (name) {
-                                       _logger->log_debug("parseProcessorNode: 
name => [%s]", name);
-                                       propertyName = name;
-                                       xmlFree(name);
-                               }
-                       }
-                       if (xmlStrcmp(currentNode->name, BAD_CAST "value") == 
0) {
-                               char *value = (char *) 
xmlNodeGetContent(currentNode);
-                               if (value) {
-                                       _logger->log_debug("parseProcessorNode: 
value => [%s]", value);
-                                       propertyValue = value;
-                                       xmlFree(value);
-                               }
-                       }
-                       if (!propertyName.empty() && !propertyValue.empty()) {
-                               processor->setProperty(propertyName, 
propertyValue);
-                       }
-               } // if (currentNode->type == XML_ELEMENT_NODE)
-       } // for node
-}
-
-void FlowController::parsePortYaml(YAML::Node *portNode, ProcessGroup *parent, 
TransferDirection direction) {
-       uuid_t uuid;
-       Processor *processor = NULL;
-       RemoteProcessorGroupPort *port = NULL;
-
-       if (!parent) {
-               _logger->log_error("parseProcessNode: no parent group existed");
-               return;
-       }
-
-       YAML::Node inputPortsObj = portNode->as<YAML::Node>();
-
-       // generate the random UIID
-       uuid_generate(uuid);
-
-       auto portId = inputPortsObj["id"].as<std::string>();
-       auto nameStr = inputPortsObj["name"].as<std::string>();
-       uuid_parse(portId.c_str(), uuid);
-
-       port = new RemoteProcessorGroupPort(nameStr.c_str(), uuid);
-
-       processor = (Processor *) port;
-       port->setDirection(direction);
-       port->setTimeOut(parent->getTimeOut());
-       port->setTransmitting(true);
-       processor->setYieldPeriodMsec(parent->getYieldPeriodMsec());
-       processor->initialize();
-
-       // handle port properties
-       YAML::Node nodeVal = portNode->as<YAML::Node>();
-       YAML::Node propertiesNode = nodeVal["Properties"];
-
-       parsePropertiesNodeYaml(&propertiesNode, processor);
-
-       // add processor to parent
-       parent->addProcessor(processor);
-       processor->setScheduledState(RUNNING);
-       auto rawMaxConcurrentTasks = inputPortsObj["max concurrent 
tasks"].as<std::string>();
-       int64_t maxConcurrentTasks;
-       if (Property::StringToInt(rawMaxConcurrentTasks, maxConcurrentTasks)) {
-               processor->setMaxConcurrentTasks(maxConcurrentTasks);
-       }
-       _logger->log_debug("parseProcessorNode: maxConcurrentTasks => [%d]", 
maxConcurrentTasks);
-       processor->setMaxConcurrentTasks(maxConcurrentTasks);
-
-}
-
-void FlowController::parsePort(xmlDoc *doc, xmlNode *processorNode, 
ProcessGroup *parent, TransferDirection direction) {
-       char *id = NULL;
-       char *name = NULL;
-       uuid_t uuid;
-       xmlNode *currentNode;
-       Processor *processor = NULL;
-       RemoteProcessorGroupPort *port = NULL;
-
-       if (!parent) {
-               _logger->log_error("parseProcessNode: no parent group existed");
-               return;
-       }
-// generate the random UIID
-       uuid_generate(uuid);
-
-       for (currentNode = processorNode->xmlChildrenNode; currentNode != NULL; 
currentNode = currentNode->next) {
-               if (currentNode->type == XML_ELEMENT_NODE) {
-                       if (xmlStrcmp(currentNode->name, BAD_CAST "id") == 0) {
-                               id = (char *) xmlNodeGetContent(currentNode);
-                               if (id) {
-                                       _logger->log_debug("parseProcessorNode: 
id => [%s]", id);
-                                       uuid_parse(id, uuid);
-                                       xmlFree(id);
-                               }
-                       } else if (xmlStrcmp(currentNode->name, BAD_CAST 
"name") == 0) {
-                               name = (char *) xmlNodeGetContent(currentNode);
-                               if (name) {
-                                       _logger->log_debug("parseProcessorNode: 
name => [%s]", name);
-                                       port = new 
RemoteProcessorGroupPort(name, uuid);
-                                       processor = (Processor *) port;
-                                       if (processor == NULL) {
-                                               xmlFree(name);
-                                               return;
-                                       }
-                                       port->setDirection(direction);
-                                       port->setTimeOut(parent->getTimeOut());
-                                       
port->setTransmitting(parent->getTransmitting());
-                                       
processor->setYieldPeriodMsec(parent->getYieldPeriodMsec());
-                                       processor->initialize();
-                                       // add processor to parent
-                                       parent->addProcessor(processor);
-                                       xmlFree(name);
-                               }
-                       } else if (xmlStrcmp(currentNode->name, BAD_CAST 
"scheduledState") == 0) {
-                               char *temp = (char *) 
xmlNodeGetContent(currentNode);
-                               if (temp) {
-                                       std::string state = temp;
-                                       if (state == "DISABLED") {
-                                               
_logger->log_debug("parseProcessorNode: scheduledState  => [%s]", 
state.c_str());
-                                               
processor->setScheduledState(DISABLED);
-                                       }
-                                       if (state == "STOPPED") {
-                                               
_logger->log_debug("parseProcessorNode: scheduledState  => [%s]", 
state.c_str());
-                                               
processor->setScheduledState(STOPPED);
-                                       }
-                                       if (state == "RUNNING") {
-                                               
_logger->log_debug("parseProcessorNode: scheduledState  => [%s]", 
state.c_str());
-                                               
processor->setScheduledState(RUNNING);
-                                       }
-                                       xmlFree(temp);
-                               }
-                       } else if (xmlStrcmp(currentNode->name, BAD_CAST 
"maxConcurrentTasks") == 0) {
-                               char *temp = (char *) 
xmlNodeGetContent(currentNode);
-                               if (temp) {
-                                       int64_t maxConcurrentTasks;
-                                       if (Property::StringToInt(temp, 
maxConcurrentTasks)) {
-                                               
_logger->log_debug("parseProcessorNode: maxConcurrentTasks => [%d]", 
maxConcurrentTasks);
-                                               
processor->setMaxConcurrentTasks(maxConcurrentTasks);
-                                       }
-                                       xmlFree(temp);
-                               }
-                       } else if (xmlStrcmp(currentNode->name, BAD_CAST 
"property") == 0) {
-                               this->parseProcessorProperty(doc, currentNode, 
processor);
-                       }
-               } // if (currentNode->type == XML_ELEMENT_NODE)
-       } // while node
-}
-
-void FlowController::parseProcessorNode(xmlDoc *doc, xmlNode *processorNode, 
ProcessGroup *parent) {
-       char *id = NULL;
-       char *name = NULL;
-       int64_t schedulingPeriod = -1;
-       int64_t penalizationPeriod = -1;
-       int64_t yieldPeriod = -1;
-       bool lossTolerant = false;
-       int64_t runDurationNanos = -1;
-       uuid_t uuid;
-       xmlNode *currentNode;
-       Processor *processor = NULL;
-
-       if (!parent) {
-               _logger->log_error("parseProcessNode: no parent group existed");
-               return;
-       }
-// generate the random UIID
-       uuid_generate(uuid);
-
-       for (currentNode = processorNode->xmlChildrenNode; currentNode != NULL; 
currentNode = currentNode->next) {
-               if (currentNode->type == XML_ELEMENT_NODE) {
-                       if (xmlStrcmp(currentNode->name, BAD_CAST "id") == 0) {
-                               id = (char *) xmlNodeGetContent(currentNode);
-                               if (id) {
-                                       _logger->log_debug("parseProcessorNode: 
id => [%s]", id);
-                                       uuid_parse(id, uuid);
-                                       xmlFree(id);
-                               }
-                       } else if (xmlStrcmp(currentNode->name, BAD_CAST 
"name") == 0) {
-                               name = (char *) xmlNodeGetContent(currentNode);
-                               if (name) {
-                                       _logger->log_debug("parseProcessorNode: 
name => [%s]", name);
-                                       processor = this->createProcessor(name, 
uuid);
-                                       if (processor == NULL) {
-                                               xmlFree(name);
-                                               return;
-                                       }
-                                       // add processor to parent
-                                       parent->addProcessor(processor);
-                                       xmlFree(name);
-                               }
-                       } else if (xmlStrcmp(currentNode->name, BAD_CAST 
"schedulingPeriod") == 0) {
-                               TimeUnit unit;
-                               char *temp = (char *) 
xmlNodeGetContent(currentNode);
-                               if (temp) {
-                                       if (Property::StringToTime(temp, 
schedulingPeriod, unit)
-                                                       && 
Property::ConvertTimeUnitToNS(schedulingPeriod, unit, schedulingPeriod)) {
-                                               
_logger->log_debug("parseProcessorNode: schedulingPeriod => [%d] ns", 
schedulingPeriod);
-                                               
processor->setSchedulingPeriodNano(schedulingPeriod);
-                                       }
-                                       xmlFree(temp);
-                               }
-                       } else if (xmlStrcmp(currentNode->name, BAD_CAST 
"penalizationPeriod") == 0) {
-                               TimeUnit unit;
-                               char *temp = (char *) 
xmlNodeGetContent(currentNode);
-                               if (temp) {
-                                       if (Property::StringToTime(temp, 
penalizationPeriod, unit)
-                                                       && 
Property::ConvertTimeUnitToMS(penalizationPeriod, unit, penalizationPeriod)) {
-                                               
_logger->log_debug("parseProcessorNode: penalizationPeriod => [%d] ms", 
penalizationPeriod);
-                                               
processor->setPenalizationPeriodMsec(penalizationPeriod);
-                                       }
-                                       xmlFree(temp);
-                               }
-                       } else if (xmlStrcmp(currentNode->name, BAD_CAST 
"yieldPeriod") == 0) {
-                               TimeUnit unit;
-                               char *temp = (char *) 
xmlNodeGetContent(currentNode);
-                               if (temp) {
-                                       if (Property::StringToTime(temp, 
yieldPeriod, unit)
-                                                       && 
Property::ConvertTimeUnitToMS(yieldPeriod, unit, yieldPeriod)) {
-                                               
_logger->log_debug("parseProcessorNode: yieldPeriod => [%d] ms", yieldPeriod);
-                                               
processor->setYieldPeriodMsec(yieldPeriod);
-                                       }
-                                       xmlFree(temp);
-                               }
-                       } else if (xmlStrcmp(currentNode->name, BAD_CAST 
"lossTolerant") == 0) {
-                               char *temp = (char *) 
xmlNodeGetContent(currentNode);
-                               if (temp) {
-                                       if (Property::StringToBool(temp, 
lossTolerant)) {
-                                               
_logger->log_debug("parseProcessorNode: lossTolerant => [%d]", lossTolerant);
-                                               
processor->setlossTolerant(lossTolerant);
-                                       }
-                                       xmlFree(temp);
-                               }
-                       } else if (xmlStrcmp(currentNode->name, BAD_CAST 
"scheduledState") == 0) {
-                               char *temp = (char *) 
xmlNodeGetContent(currentNode);
-                               if (temp) {
-                                       std::string state = temp;
-                                       if (state == "DISABLED") {
-                                               
_logger->log_debug("parseProcessorNode: scheduledState  => [%s]", 
state.c_str());
-                                               
processor->setScheduledState(DISABLED);
-                                       }
-                                       if (state == "STOPPED") {
-                                               
_logger->log_debug("parseProcessorNode: scheduledState  => [%s]", 
state.c_str());
-                                               
processor->setScheduledState(STOPPED);
-                                       }
-                                       if (state == "RUNNING") {
-                                               
_logger->log_debug("parseProcessorNode: scheduledState  => [%s]", 
state.c_str());
-                                               
processor->setScheduledState(RUNNING);
-                                       }
-                                       xmlFree(temp);
-                               }
-                       } else if (xmlStrcmp(currentNode->name, BAD_CAST 
"schedulingStrategy") == 0) {
-                               char *temp = (char *) 
xmlNodeGetContent(currentNode);
-                               if (temp) {
-                                       std::string strategy = temp;
-                                       if (strategy == "TIMER_DRIVEN") {
-                                               
_logger->log_debug("parseProcessorNode: scheduledStrategy  => [%s]", 
strategy.c_str());
-                                               
processor->setSchedulingStrategy(TIMER_DRIVEN);
-                                       }
-                                       if (strategy == "EVENT_DRIVEN") {
-                                               
_logger->log_debug("parseProcessorNode: scheduledStrategy  => [%s]", 
strategy.c_str());
-                                               
processor->setSchedulingStrategy(EVENT_DRIVEN);
-                                       }
-                                       xmlFree(temp);
-                               }
-                       } else if (xmlStrcmp(currentNode->name, BAD_CAST 
"maxConcurrentTasks") == 0) {
-                               char *temp = (char *) 
xmlNodeGetContent(currentNode);
-                               if (temp) {
-                                       int64_t maxConcurrentTasks;
-                                       if (Property::StringToInt(temp, 
maxConcurrentTasks)) {
-                                               
_logger->log_debug("parseProcessorNode: maxConcurrentTasks => [%d]", 
maxConcurrentTasks);
-                                               
processor->setMaxConcurrentTasks(maxConcurrentTasks);
-                                       }
-                                       xmlFree(temp);
-                               }
-                       } else if (xmlStrcmp(currentNode->name, BAD_CAST 
"runDurationNanos") == 0) {
-                               char *temp = (char *) 
xmlNodeGetContent(currentNode);
-                               if (temp) {
-                                       if (Property::StringToInt(temp, 
runDurationNanos)) {
-                                               
_logger->log_debug("parseProcessorNode: runDurationNanos => [%d]", 
runDurationNanos);
-                                               
processor->setRunDurationNano(runDurationNanos);
-                                       }
-                                       xmlFree(temp);
-                               }
-                       } else if (xmlStrcmp(currentNode->name, BAD_CAST 
"autoTerminatedRelationship") == 0) {
-                               char *temp = (char *) 
xmlNodeGetContent(currentNode);
-                               if (temp) {
-                                       std::string relationshipName = temp;
-                                       Relationship 
relationship(relationshipName, "");
-                                       std::set<Relationship> relationships;
-
-                                       relationships.insert(relationship);
-                                       
processor->setAutoTerminatedRelationships(relationships);
-                                       _logger->log_debug("parseProcessorNode: 
autoTerminatedRelationship  => [%s]",
-                                                       
relationshipName.c_str());
-                                       xmlFree(temp);
-                               }
-                       } else if (xmlStrcmp(currentNode->name, BAD_CAST 
"property") == 0) {
-                               this->parseProcessorProperty(doc, currentNode, 
processor);
-                       }
-               } // if (currentNode->type == XML_ELEMENT_NODE)
-       } // while node
-}
-
-void FlowController::parsePropertiesNodeYaml(YAML::Node *propertiesNode, 
Processor *processor)
-{
-    // Treat generically as a YAML node so we can perform inspection on 
entries to ensure they are populated
-    for (YAML::const_iterator propsIter = propertiesNode->begin(); propsIter 
!= propertiesNode->end(); ++propsIter)
-    {
-        std::string propertyName = propsIter->first.as<std::string>();
-        YAML::Node propertyValueNode = propsIter->second;
-        if (!propertyValueNode.IsNull() && propertyValueNode.IsDefined())
-        {
-            std::string rawValueString = propertyValueNode.as<std::string>();
-            if (!processor->setProperty(propertyName, rawValueString))
-            {
-                _logger->log_warn("Received property %s with value %s but is 
not one of the properties for %s", propertyName.c_str(), 
rawValueString.c_str(), processor->getName().c_str());
-            }
-        }
-    }
-}
-
-void FlowController::load(ConfigFormat configFormat) {
-       if (_running) {
-               stop(true);
-       }
-       if (!_initialized) {
-               _logger->log_info("Load Flow Controller from file %s", 
_configurationFileName.c_str());
-
-               if (ConfigFormat::XML == configFormat) {
-                       _logger->log_info("Detected an XML configuration file 
for processing.");
-
-                       xmlDoc *doc = 
xmlReadFile(_configurationFileName.c_str(), NULL, XML_PARSE_NONET);
-                       if (doc == NULL) {
-                               _logger->log_error("xmlReadFile returned NULL 
when reading [%s]", _configurationFileName.c_str());
-                               _initialized = true;
-                               return;
-                       }
-
-                       xmlNode *root = xmlDocGetRootElement(doc);
-
-                       if (root == NULL) {
-                               _logger->log_error("Can not get root from XML 
doc %s", _configurationFileName.c_str());
-                               xmlFreeDoc(doc);
-                               xmlCleanupParser();
-                       }
-
-                       if (xmlStrcmp(root->name, BAD_CAST "flowController") != 
0) {
-                               _logger->log_error("Root name is not 
flowController for XML doc %s", _configurationFileName.c_str());
-                               xmlFreeDoc(doc);
-                               xmlCleanupParser();
-                               return;
-                       }
-
-                       xmlNode *currentNode;
-
-                       for (currentNode = root->xmlChildrenNode; currentNode 
!= NULL; currentNode = currentNode->next) {
-                               if (currentNode->type == XML_ELEMENT_NODE) {
-                                       if (xmlStrcmp(currentNode->name, 
BAD_CAST "rootGroup") == 0) {
-                                               
this->parseRootProcessGroup(doc, currentNode);
-                                       } else if (xmlStrcmp(currentNode->name, 
BAD_CAST "maxTimerDrivenThreadCount") == 0) {
-                                               char *temp = (char *) 
xmlNodeGetContent(currentNode);
-                                               int64_t 
maxTimerDrivenThreadCount;
-                                               if (temp) {
-                                                       if 
(Property::StringToInt(temp, maxTimerDrivenThreadCount)) {
-                                                               
_logger->log_debug("maxTimerDrivenThreadCount => [%d]", 
maxTimerDrivenThreadCount);
-                                                               
this->_maxTimerDrivenThreads = maxTimerDrivenThreadCount;
-                                                       }
-                                                       xmlFree(temp);
-                                               }
-                                       } else if (xmlStrcmp(currentNode->name, 
BAD_CAST "maxEventDrivenThreadCount") == 0) {
-                                               char *temp = (char *) 
xmlNodeGetContent(currentNode);
-                                               int64_t 
maxEventDrivenThreadCount;
-                                               if (temp) {
-                                                       if 
(Property::StringToInt(temp, maxEventDrivenThreadCount)) {
-                                                               
_logger->log_debug("maxEventDrivenThreadCount => [%d]", 
maxEventDrivenThreadCount);
-                                                               
this->_maxEventDrivenThreads = maxEventDrivenThreadCount;
-                                                       }
-                                                       xmlFree(temp);
-                                               }
-                                       }
-                               } // type == XML_ELEMENT_NODE
-                       } // for
-
-                       xmlFreeDoc(doc);
-                       xmlCleanupParser();
-                       _initialized = true;
-               } else if (ConfigFormat::YAML == configFormat) {
-                       YAML::Node flow = 
YAML::LoadFile(_configurationFileName);
-
-                       YAML::Node flowControllerNode = flow["Flow Controller"];
-                       YAML::Node processorsNode = 
flow[CONFIG_YAML_PROCESSORS_KEY];
-                       YAML::Node connectionsNode = flow["Connections"];
-                       YAML::Node remoteProcessingGroupNode = flow["Remote 
Processing Groups"];
-
-                       // Create the root process group
-                       parseRootProcessGroupYaml(flowControllerNode);
-                       parseProcessorNodeYaml(processorsNode, this->_root);
-                       parseRemoteProcessGroupYaml(&remoteProcessingGroupNode, 
this->_root);
-                       parseConnectionYaml(&connectionsNode, this->_root);
-
-                       _initialized = true;
-               }
-       }
-}
-
-bool FlowController::start() {
-       if (!_initialized) {
-               _logger->log_error("Can not start Flow Controller because it 
has not been initialized");
-               return false;
-       } else {
-               if (!_running) {
-                       _logger->log_info("Start Flow Controller");
-                       this->_timerScheduler.start();
-                       if (this->_root)
-                               
this->_root->startProcessing(&this->_timerScheduler);
-                       _running = true;
-                       this->_protocol->start();
-               }
-               return true;
-       }
-}

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b02af540/src/FlowFileRecord.cpp
----------------------------------------------------------------------
diff --git a/src/FlowFileRecord.cpp b/src/FlowFileRecord.cpp
deleted file mode 100644
index 2dda47a..0000000
--- a/src/FlowFileRecord.cpp
+++ /dev/null
@@ -1,231 +0,0 @@
-/**
- * @file FlowFileRecord.cpp
- * Flow file record class implementation 
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#include <vector>
-#include <queue>
-#include <map>
-#include <sys/time.h>
-#include <time.h>
-#include <iostream>
-#include <fstream>
-#include <cstdio>
-
-#include "FlowFileRecord.h"
-#include "Relationship.h"
-#include "Logger.h"
-
-std::atomic<uint64_t> FlowFileRecord::_localFlowSeqNumber(0);
-
-FlowFileRecord::FlowFileRecord(std::map<std::string, std::string> attributes, 
ResourceClaim *claim)
-: _size(0),
-  _id(_localFlowSeqNumber.load()),
-  _offset(0),
-  _penaltyExpirationMs(0),
-  _claim(claim),
-  _markedDelete(false),
-  _connection(NULL),
-  _orginalConnection(NULL)
-{
-       _entryDate = getTimeMillis();
-       _lineageStartDate = _entryDate;
-
-       char uuidStr[37];
-
-       // Generate the global UUID for the flow record
-       uuid_generate(_uuid);
-       // Increase the local ID for the flow record
-       ++_localFlowSeqNumber;
-       uuid_unparse(_uuid, uuidStr);
-       _uuidStr = uuidStr;
-
-       // Populate the default attributes
-    addAttribute(FILENAME, std::to_string(getTimeNano()));
-    addAttribute(PATH, DEFAULT_FLOWFILE_PATH);
-    addAttribute(UUID, uuidStr);
-       // Populate the attributes from the input
-    std::map<std::string, std::string>::iterator it;
-    for (it = attributes.begin(); it!= attributes.end(); it++)
-    {
-       addAttribute(it->first, it->second);
-    }
-
-    _snapshot = false;
-
-       if (_claim)
-               // Increase the flow file record owned count for the resource 
claim
-               _claim->increaseFlowFileRecordOwnedCount();
-       _logger = Logger::getLogger();
-}
-
-FlowFileRecord::~FlowFileRecord()
-{
-       if (!_snapshot)
-               _logger->log_debug("Delete FlowFile UUID %s", _uuidStr.c_str());
-       else
-               _logger->log_debug("Delete SnapShot FlowFile UUID %s", 
_uuidStr.c_str());
-       if (_claim)
-       {
-               // Decrease the flow file record owned count for the resource 
claim
-               _claim->decreaseFlowFileRecordOwnedCount();
-               if (_claim->getFlowFileRecordOwnedCount() == 0)
-               {
-                       _logger->log_debug("Delete Resource Claim %s", 
_claim->getContentFullPath().c_str());
-                       std::remove(_claim->getContentFullPath().c_str());
-                       delete _claim;
-               }
-       }
-}
-
-bool FlowFileRecord::addAttribute(FlowAttribute key, std::string value)
-{
-       const char *keyStr = FlowAttributeKey(key);
-       if (keyStr)
-       {
-               std::string keyString = keyStr;
-               return addAttribute(keyString, value);
-       }
-       else
-       {
-               return false;
-       }
-}
-
-bool FlowFileRecord::addAttribute(std::string key, std::string value)
-{
-       std::map<std::string, std::string>::iterator it = _attributes.find(key);
-       if (it != _attributes.end())
-       {
-               // attribute already there in the map
-               return false;
-       }
-       else
-       {
-               _attributes[key] = value;
-               return true;
-       }
-}
-
-bool FlowFileRecord::removeAttribute(FlowAttribute key)
-{
-       const char *keyStr = FlowAttributeKey(key);
-       if (keyStr)
-       {
-               std::string keyString = keyStr;
-               return removeAttribute(keyString);
-       }
-       else
-       {
-               return false;
-       }
-}
-
-bool FlowFileRecord::removeAttribute(std::string key)
-{
-       std::map<std::string, std::string>::iterator it = _attributes.find(key);
-       if (it != _attributes.end())
-       {
-               _attributes.erase(key);
-               return true;
-       }
-       else
-       {
-               return false;
-       }
-}
-
-bool FlowFileRecord::updateAttribute(FlowAttribute key, std::string value)
-{
-       const char *keyStr = FlowAttributeKey(key);
-       if (keyStr)
-       {
-               std::string keyString = keyStr;
-               return updateAttribute(keyString, value);
-       }
-       else
-       {
-               return false;
-       }
-}
-
-bool FlowFileRecord::updateAttribute(std::string key, std::string value)
-{
-       std::map<std::string, std::string>::iterator it = _attributes.find(key);
-       if (it != _attributes.end())
-       {
-               _attributes[key] = value;
-               return true;
-       }
-       else
-       {
-               return false;
-       }
-}
-
-bool FlowFileRecord::getAttribute(FlowAttribute key, std::string &value)
-{
-       const char *keyStr = FlowAttributeKey(key);
-       if (keyStr)
-       {
-               std::string keyString = keyStr;
-               return getAttribute(keyString, value);
-       }
-       else
-       {
-               return false;
-       }
-}
-
-bool FlowFileRecord::getAttribute(std::string key, std::string &value)
-{
-       std::map<std::string, std::string>::iterator it = _attributes.find(key);
-       if (it != _attributes.end())
-       {
-               value = it->second;
-               return true;
-       }
-       else
-       {
-               return false;
-       }
-}
-
-void FlowFileRecord::duplicate(FlowFileRecord *original)
-{
-       uuid_copy(this->_uuid, original->_uuid);
-       this->_attributes = original->_attributes;
-       this->_entryDate = original->_entryDate;
-       this->_id = original->_id;
-       this->_lastQueueDate = original->_lastQueueDate;
-       this->_lineageStartDate = original->_lineageStartDate;
-       this->_offset = original->_offset;
-       this->_penaltyExpirationMs = original->_penaltyExpirationMs;
-       this->_size = original->_size;
-       this->_lineageIdentifiers = original->_lineageIdentifiers;
-       this->_orginalConnection = original->_orginalConnection;
-       this->_uuidStr = original->_uuidStr;
-       this->_connection = original->_connection;
-       this->_markedDelete = original->_markedDelete;
-
-       this->_claim = original->_claim;
-       if (this->_claim)
-               this->_claim->increaseFlowFileRecordOwnedCount();
-
-       this->_snapshot = true;
-}
-

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b02af540/src/GenerateFlowFile.cpp
----------------------------------------------------------------------
diff --git a/src/GenerateFlowFile.cpp b/src/GenerateFlowFile.cpp
deleted file mode 100644
index 4b0603d..0000000
--- a/src/GenerateFlowFile.cpp
+++ /dev/null
@@ -1,134 +0,0 @@
-/**
- * @file GenerateFlowFile.cpp
- * GenerateFlowFile class implementation
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#include <vector>
-#include <queue>
-#include <map>
-#include <set>
-#include <sys/time.h>
-#include <time.h>
-#include <chrono>
-#include <thread>
-#include <random>
-
-#include "GenerateFlowFile.h"
-#include "ProcessContext.h"
-#include "ProcessSession.h"
-
-const char *GenerateFlowFile::DATA_FORMAT_BINARY = "Binary";
-const char *GenerateFlowFile::DATA_FORMAT_TEXT = "Text";
-const std::string GenerateFlowFile::ProcessorName("GenerateFlowFile");
-Property GenerateFlowFile::FileSize("File Size", "The size of the file that 
will be used", "1 kB");
-Property GenerateFlowFile::BatchSize("Batch Size", "The number of FlowFiles to 
be transferred in each invocation", "1");
-Property GenerateFlowFile::DataFormat("Data Format", "Specifies whether the 
data should be Text or Binary", GenerateFlowFile::DATA_FORMAT_BINARY);
-Property GenerateFlowFile::UniqueFlowFiles("Unique FlowFiles",
-               "If true, each FlowFile that is generated will be unique. If 
false, a random value will be generated and all FlowFiles", "true");
-Relationship GenerateFlowFile::Success("success", "success operational on the 
flow record");
-
-void GenerateFlowFile::initialize()
-{
-       //! Set the supported properties
-       std::set<Property> properties;
-       properties.insert(FileSize);
-       properties.insert(BatchSize);
-       properties.insert(DataFormat);
-       properties.insert(UniqueFlowFiles);
-       setSupportedProperties(properties);
-       //! Set the supported relationships
-       std::set<Relationship> relationships;
-       relationships.insert(Success);
-       setSupportedRelationships(relationships);
-}
-
-void GenerateFlowFile::onTrigger(ProcessContext *context, ProcessSession 
*session)
-{
-       int64_t batchSize = 1;
-       bool uniqueFlowFile = true;
-       int64_t fileSize = 1024;
-
-       std::string value;
-       if (context->getProperty(FileSize.getName(), value))
-       {
-               Property::StringToInt(value, fileSize);
-       }
-       if (context->getProperty(BatchSize.getName(), value))
-       {
-               Property::StringToInt(value, batchSize);
-       }
-       if (context->getProperty(UniqueFlowFiles.getName(), value))
-       {
-               Property::StringToBool(value, uniqueFlowFile);
-       }
-
-       if (!uniqueFlowFile)
-       {
-               char *data;
-               data = new char[fileSize];
-               if (!data)
-                       return;
-               uint64_t dataSize = fileSize;
-               GenerateFlowFile::WriteCallback callback(data, dataSize);
-               char *current = data;
-               for (int i = 0; i < fileSize; i+= sizeof(int))
-               {
-                       int randValue = random();
-                       *((int *) current) = randValue;
-                       current += sizeof(int);
-               }
-               for (int i = 0; i < batchSize; i++)
-               {
-                       // For each batch
-                       FlowFileRecord *flowFile = session->create();
-                       if (!flowFile)
-                               return;
-                       if (fileSize > 0)
-                               session->write(flowFile, &callback);
-                       session->transfer(flowFile, Success);
-               }
-               delete[] data;
-       }
-       else
-       {
-               if (!_data)
-               {
-                       // We have not create the unique data yet
-                       _data = new char[fileSize];
-                       _dataSize = fileSize;
-                       char *current = _data;
-                       for (int i = 0; i < fileSize; i+= sizeof(int))
-                       {
-                               int randValue = random();
-                               *((int *) current) = randValue;
-                               // *((int *) current) = (0xFFFFFFFF & i);
-                               current += sizeof(int);
-                       }
-               }
-               GenerateFlowFile::WriteCallback callback(_data, _dataSize);
-               for (int i = 0; i < batchSize; i++)
-               {
-                       // For each batch
-                       FlowFileRecord *flowFile = session->create();
-                       if (!flowFile)
-                               return;
-                       if (fileSize > 0)
-                               session->write(flowFile, &callback);
-                       session->transfer(flowFile, Success);
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b02af540/src/GetFile.cpp
----------------------------------------------------------------------
diff --git a/src/GetFile.cpp b/src/GetFile.cpp
deleted file mode 100644
index 02e196a..0000000
--- a/src/GetFile.cpp
+++ /dev/null
@@ -1,295 +0,0 @@
-/**
- * @file GetFile.cpp
- * GetFile class implementation
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#include <vector>
-#include <queue>
-#include <map>
-#include <set>
-#include <sys/time.h>
-#include <sys/types.h>
-#include <sys/stat.h>
-#include <time.h>
-#include <sstream>
-#include <stdio.h>
-#include <string>
-#include <iostream>
-#include <dirent.h>
-#include <limits.h>
-#include <unistd.h>
-#include <regex>
-
-#include "TimeUtil.h"
-#include "GetFile.h"
-#include "ProcessContext.h"
-#include "ProcessSession.h"
-
-const std::string GetFile::ProcessorName("GetFile");
-Property GetFile::BatchSize("Batch Size", "The maximum number of files to pull 
in each iteration", "10");
-Property GetFile::Directory("Input Directory", "The input directory from which 
to pull files", ".");
-Property GetFile::IgnoreHiddenFile("Ignore Hidden Files", "Indicates whether 
or not hidden files should be ignored", "true");
-Property GetFile::KeepSourceFile("Keep Source File",
-               "If true, the file is not deleted after it has been copied to 
the Content Repository", "false");
-Property GetFile::MaxAge("Maximum File Age",
-               "The minimum age that a file must be in order to be pulled; any 
file younger than this amount of time (according to last modification date) 
will be ignored", "0 sec");
-Property GetFile::MinAge("Minimum File Age",
-               "The maximum age that a file must be in order to be pulled; any 
file older than this amount of time (according to last modification date) will 
be ignored", "0 sec");
-Property GetFile::MaxSize("Maximum File Size", "The maximum size that a file 
can be in order to be pulled", "0 B");
-Property GetFile::MinSize("Minimum File Size", "The minimum size that a file 
must be in order to be pulled", "0 B");
-Property GetFile::PollInterval("Polling Interval", "Indicates how long to wait 
before performing a directory listing", "0 sec");
-Property GetFile::Recurse("Recurse Subdirectories", "Indicates whether or not 
to pull files from subdirectories", "true");
-Property GetFile::FileFilter("File Filter", "Only files whose names match the 
given regular expression will be picked up", "[^\\.].*");
-Relationship GetFile::Success("success", "All files are routed to success");
-
-void GetFile::initialize()
-{
-       //! Set the supported properties
-       std::set<Property> properties;
-       properties.insert(BatchSize);
-       properties.insert(Directory);
-       properties.insert(IgnoreHiddenFile);
-       properties.insert(KeepSourceFile);
-       properties.insert(MaxAge);
-       properties.insert(MinAge);
-       properties.insert(MaxSize);
-       properties.insert(MinSize);
-       properties.insert(PollInterval);
-       properties.insert(Recurse);
-       properties.insert(FileFilter);
-       setSupportedProperties(properties);
-       //! Set the supported relationships
-       std::set<Relationship> relationships;
-       relationships.insert(Success);
-       setSupportedRelationships(relationships);
-}
-
-void GetFile::onTrigger(ProcessContext *context, ProcessSession *session)
-{
-       std::string value;
-       if (context->getProperty(Directory.getName(), value))
-       {
-               _directory = value;
-       }
-       if (context->getProperty(BatchSize.getName(), value))
-       {
-               Property::StringToInt(value, _batchSize);
-       }
-       if (context->getProperty(IgnoreHiddenFile.getName(), value))
-       {
-               Property::StringToBool(value, _ignoreHiddenFile);
-       }
-       if (context->getProperty(KeepSourceFile.getName(), value))
-       {
-               Property::StringToBool(value, _keepSourceFile);
-       }
-       if (context->getProperty(MaxAge.getName(), value))
-       {
-               TimeUnit unit;
-               if (Property::StringToTime(value, _maxAge, unit) &&
-                       Property::ConvertTimeUnitToMS(_maxAge, unit, _maxAge))
-               {
-
-               }
-       }
-       if (context->getProperty(MinAge.getName(), value))
-       {
-               TimeUnit unit;
-               if (Property::StringToTime(value, _minAge, unit) &&
-                       Property::ConvertTimeUnitToMS(_minAge, unit, _minAge))
-               {
-
-               }
-       }
-       if (context->getProperty(MaxSize.getName(), value))
-       {
-               Property::StringToInt(value, _maxSize);
-       }
-       if (context->getProperty(MinSize.getName(), value))
-       {
-               Property::StringToInt(value, _minSize);
-       }
-       if (context->getProperty(PollInterval.getName(), value))
-       {
-               TimeUnit unit;
-               if (Property::StringToTime(value, _pollInterval, unit) &&
-                       Property::ConvertTimeUnitToMS(_pollInterval, unit, 
_pollInterval))
-               {
-
-               }
-       }
-       if (context->getProperty(Recurse.getName(), value))
-       {
-               Property::StringToBool(value, _recursive);
-       }
-
-       if (context->getProperty(FileFilter.getName(), value))
-       {
-               _fileFilter = value;
-       }
-
-       // Perform directory list
-       if (isListingEmpty())
-       {
-               if (_pollInterval == 0 || (getTimeMillis() - 
_lastDirectoryListingTime) > _pollInterval)
-               {
-                       performListing(_directory);
-               }
-       }
-
-       if (!isListingEmpty())
-       {
-               try
-               {
-                       std::queue<std::string> list;
-                       pollListing(list, _batchSize);
-                       while (!list.empty())
-                       {
-                               std::string fileName = list.front();
-                               list.pop();
-                               _logger->log_info("GetFile process %s", 
fileName.c_str());
-                               FlowFileRecord *flowFile = session->create();
-                               if (!flowFile)
-                                       return;
-                               std::size_t found = 
fileName.find_last_of("/\\");
-                               std::string path = fileName.substr(0,found);
-                               std::string name = fileName.substr(found+1);
-                               flowFile->updateAttribute(FILENAME, name);
-                               flowFile->updateAttribute(PATH, path);
-                               flowFile->addAttribute(ABSOLUTE_PATH, fileName);
-                               session->import(fileName, flowFile, 
_keepSourceFile);
-                               session->transfer(flowFile, Success);
-                       }
-               }
-               catch (std::exception &exception)
-               {
-                       _logger->log_debug("GetFile Caught Exception %s", 
exception.what());
-                       throw;
-               }
-               catch (...)
-               {
-                       throw;
-               }
-       }
-}
-
-bool GetFile::isListingEmpty()
-{
-       std::lock_guard<std::mutex> lock(_mtx);
-
-       return _dirList.empty();
-}
-
-void GetFile::putListing(std::string fileName)
-{
-       std::lock_guard<std::mutex> lock(_mtx);
-
-       _dirList.push(fileName);
-}
-
-void GetFile::pollListing(std::queue<std::string> &list, int maxSize)
-{
-       std::lock_guard<std::mutex> lock(_mtx);
-
-       while (!_dirList.empty() && (maxSize == 0 || list.size() < maxSize))
-       {
-               std::string fileName = _dirList.front();
-               _dirList.pop();
-               list.push(fileName);
-       }
-
-       return;
-}
-
-bool GetFile::acceptFile(std::string fileName)
-{
-       struct stat statbuf;
-
-       if (stat(fileName.c_str(), &statbuf) == 0)
-       {
-               if (_minSize > 0 && statbuf.st_size <_minSize)
-                       return false;
-
-               if (_maxSize > 0 && statbuf.st_size > _maxSize)
-                       return false;
-
-               uint64_t modifiedTime = ((uint64_t) (statbuf.st_mtime) * 1000);
-               uint64_t fileAge = getTimeMillis() - modifiedTime;
-               if (_minAge > 0 && fileAge < _minAge)
-                       return false;
-               if (_maxAge > 0 && fileAge > _maxAge)
-                       return false;
-
-               if (_ignoreHiddenFile && fileName.c_str()[0] == '.')
-                       return false;
-
-               if (access(fileName.c_str(), R_OK) != 0)
-                       return false;
-
-               if (_keepSourceFile == false && access(fileName.c_str(), W_OK) 
!= 0)
-                       return false;
-
-               try {
-                       std::regex re(_fileFilter);
-                       if (!std::regex_match(fileName, re)) {
-                               return false;
-                       }
-               } catch (std::regex_error e) {
-                       _logger->log_error("Invalid File Filter regex: %s.", 
e.what());
-                       return false;
-               }
-
-               return true;
-       }
-
-       return false;
-}
-
-void GetFile::performListing(std::string dir)
-{
-       DIR *d;
-       d = opendir(dir.c_str());
-       if (!d)
-               return;
-       while (1)
-       {
-               struct dirent *entry;
-               entry = readdir(d);
-               if (!entry)
-                       break;
-               std::string d_name = entry->d_name;
-               if ((entry->d_type & DT_DIR))
-               {
-                       // if this is a directory
-                       if (_recursive && strcmp(d_name.c_str(), "..") != 0 && 
strcmp(d_name.c_str(), ".") != 0)
-                       {
-                               std::string path = dir + "/" + d_name;
-                               performListing(path);
-                       }
-               }
-               else
-               {
-                       std::string fileName = dir + "/" + d_name;
-                       if (acceptFile(fileName))
-                       {
-                               // check whether we can take this file
-                               putListing(fileName);
-                       }
-               }
-       }
-       closedir(d);
-}

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b02af540/src/ListenSyslog.cpp
----------------------------------------------------------------------
diff --git a/src/ListenSyslog.cpp b/src/ListenSyslog.cpp
deleted file mode 100644
index ace37d7..0000000
--- a/src/ListenSyslog.cpp
+++ /dev/null
@@ -1,342 +0,0 @@
-/**
- * @file ListenSyslog.cpp
- * ListenSyslog class implementation
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#include <queue>
-#include <stdio.h>
-#include <string>
-#include "TimeUtil.h"
-#include "ListenSyslog.h"
-#include "ProcessContext.h"
-#include "ProcessSession.h"
-
-const std::string ListenSyslog::ProcessorName("ListenSyslog");
-Property ListenSyslog::RecvBufSize("Receive Buffer Size", "The size of each 
buffer used to receive Syslog messages.", "65507 B");
-Property ListenSyslog::MaxSocketBufSize("Max Size of Socket Buffer", "The 
maximum size of the socket buffer that should be used.", "1 MB");
-Property ListenSyslog::MaxConnections("Max Number of TCP Connections", "The 
maximum number of concurrent connections to accept Syslog messages in TCP 
mode.", "2");
-Property ListenSyslog::MaxBatchSize("Max Batch Size",
-               "The maximum number of Syslog events to add to a single 
FlowFile.", "1");
-Property ListenSyslog::MessageDelimiter("Message Delimiter",
-               "Specifies the delimiter to place between Syslog messages when 
multiple messages are bundled together (see <Max Batch Size> property).", "\n");
-Property ListenSyslog::ParseMessages("Parse Messages",
-               "Indicates if the processor should parse the Syslog messages. 
If set to false, each outgoing FlowFile will only.", "false");
-Property ListenSyslog::Protocol("Protocol", "The protocol for Syslog 
communication.", "UDP");
-Property ListenSyslog::Port("Port", "The port for Syslog communication.", 
"514");
-Relationship ListenSyslog::Success("success", "All files are routed to 
success");
-Relationship ListenSyslog::Invalid("invalid", "SysLog message format invalid");
-
-void ListenSyslog::initialize()
-{
-       //! Set the supported properties
-       std::set<Property> properties;
-       properties.insert(RecvBufSize);
-       properties.insert(MaxSocketBufSize);
-       properties.insert(MaxConnections);
-       properties.insert(MaxBatchSize);
-       properties.insert(MessageDelimiter);
-       properties.insert(ParseMessages);
-       properties.insert(Protocol);
-       properties.insert(Port);
-       setSupportedProperties(properties);
-       //! Set the supported relationships
-       std::set<Relationship> relationships;
-       relationships.insert(Success);
-       relationships.insert(Invalid);
-       setSupportedRelationships(relationships);
-}
-
-void ListenSyslog::startSocketThread()
-{
-       if (_thread != NULL)
-               return;
-
-       _logger->log_info("ListenSysLog Socket Thread Start");
-       _serverTheadRunning = true;
-       _thread = new std::thread(run, this);
-       _thread->detach();
-}
-
-void ListenSyslog::run(ListenSyslog *process)
-{
-       process->runThread();
-}
-
-void ListenSyslog::runThread()
-{
-       while (_serverTheadRunning)
-       {
-               if (_resetServerSocket)
-               {
-                       _resetServerSocket = false;
-                       // need to reset the socket
-                       std::vector<int>::iterator it;
-                       for (it = _clientSockets.begin(); it != 
_clientSockets.end(); ++it)
-                       {
-                               int clientSocket = *it;
-                               close(clientSocket);
-                       }
-                       _clientSockets.clear();
-                       if (_serverSocket > 0)
-                       {
-                               close(_serverSocket);
-                               _serverSocket = 0;
-                       }
-               }
-
-               if (_serverSocket <= 0)
-               {
-                       uint16_t portno = _port;
-                       struct sockaddr_in serv_addr;
-                       int sockfd;
-                       if (_protocol == "TCP")
-                               sockfd = socket(AF_INET, SOCK_STREAM, 0);
-                       else
-                               sockfd = socket(AF_INET, SOCK_DGRAM, 0);
-                       if (sockfd < 0)
-                       {
-                               _logger->log_info("ListenSysLog Server socket 
creation failed");
-                               break;
-                       }
-                       bzero((char *) &serv_addr, sizeof(serv_addr));
-                       serv_addr.sin_family = AF_INET;
-                       serv_addr.sin_addr.s_addr = INADDR_ANY;
-                       serv_addr.sin_port = htons(portno);
-                       if (bind(sockfd, (struct sockaddr *) &serv_addr,
-                                       sizeof(serv_addr)) < 0)
-                       {
-                               _logger->log_error("ListenSysLog Server socket 
bind failed");
-                               break;
-                       }
-                       if (_protocol == "TCP")
-                               listen(sockfd,5);
-                       _serverSocket = sockfd;
-                       _logger->log_error("ListenSysLog Server socket %d bind 
OK to port %d", _serverSocket, portno);
-               }
-               FD_ZERO(&_readfds);
-               FD_SET(_serverSocket, &_readfds);
-               _maxFds = _serverSocket;
-               std::vector<int>::iterator it;
-               for (it = _clientSockets.begin(); it != _clientSockets.end(); 
++it)
-               {
-                       int clientSocket = *it;
-                       if (clientSocket >= _maxFds)
-                               _maxFds = clientSocket;
-                       FD_SET(clientSocket, &_readfds);
-               }
-               fd_set fds;
-               struct timeval tv;
-               int retval;
-               fds = _readfds;
-               tv.tv_sec = 0;
-               // 100 msec
-               tv.tv_usec = 100000;
-               retval = select(_maxFds+1, &fds, NULL, NULL, &tv);
-               if (retval < 0)
-                       break;
-               if (retval == 0)
-                       continue;
-               if (FD_ISSET(_serverSocket, &fds))
-               {
-                       // server socket, either we have UDP datagram or TCP 
connection request
-                       if (_protocol == "TCP")
-                       {
-                               socklen_t clilen;
-                               struct sockaddr_in cli_addr;
-                               clilen = sizeof(cli_addr);
-                               int newsockfd = accept(_serverSocket,
-                                               (struct sockaddr *) &cli_addr,
-                                               &clilen);
-                               if (newsockfd > 0)
-                               {
-                                       if (_clientSockets.size() < 
_maxConnections)
-                                       {
-                                               
_clientSockets.push_back(newsockfd);
-                                               _logger->log_info("ListenSysLog 
new client socket %d connection", newsockfd);
-                                               continue;
-                                       }
-                                       else
-                                       {
-                                               close(newsockfd);
-                                       }
-                               }
-                       }
-                       else
-                       {
-                               socklen_t clilen;
-                               struct sockaddr_in cli_addr;
-                               clilen = sizeof(cli_addr);
-                               int recvlen = recvfrom(_serverSocket, _buffer, 
sizeof(_buffer), 0,
-                                               (struct sockaddr *)&cli_addr, 
&clilen);
-                               if (recvlen > 0 && (recvlen + 
getEventQueueByteSize()) <= _recvBufSize)
-                               {
-                                       uint8_t *payload = new uint8_t[recvlen];
-                                       memcpy(payload, _buffer, recvlen);
-                                       putEvent(payload, recvlen);
-                               }
-                       }
-               }
-               it = _clientSockets.begin();
-               while (it != _clientSockets.end())
-               {
-                       int clientSocket = *it;
-                       if (FD_ISSET(clientSocket, &fds))
-                       {
-                               int recvlen = readline(clientSocket, (char 
*)_buffer, sizeof(_buffer));
-                               if (recvlen <= 0)
-                               {
-                                       close(clientSocket);
-                                       _logger->log_info("ListenSysLog client 
socket %d close", clientSocket);
-                                       it = _clientSockets.erase(it);
-                               }
-                               else
-                               {
-                                       if ((recvlen + getEventQueueByteSize()) 
<= _recvBufSize)
-                                       {
-                                               uint8_t *payload = new 
uint8_t[recvlen];
-                                               memcpy(payload, _buffer, 
recvlen);
-                                               putEvent(payload, recvlen);
-                                       }
-                                       ++it;
-                               }
-                       }
-               }
-       }
-       return;
-}
-
-
-int ListenSyslog::readline( int fd, char *bufptr, size_t len )
-{
-       char *bufx = bufptr;
-       static char *bp;
-       static int cnt = 0;
-       static char b[ 2048 ];
-       char c;
-
-       while ( --len > 0 )
-    {
-      if ( --cnt <= 0 )
-      {
-         cnt = recv( fd, b, sizeof( b ), 0 );
-         if ( cnt < 0 )
-         {
-                 if ( errno == EINTR )
-                 {
-                         len++;                /* the while will decrement */
-                         continue;
-                 }
-                 return -1;
-         }
-         if ( cnt == 0 )
-                 return 0;
-         bp = b;
-      }
-      c = *bp++;
-      *bufptr++ = c;
-      if ( c == '\n' )
-      {
-         *bufptr = '\n';
-         return bufptr - bufx + 1;
-      }
-    }
-       return -1;
-}
-
-void ListenSyslog::onTrigger(ProcessContext *context, ProcessSession *session)
-{
-       std::string value;
-       bool needResetServerSocket = false;
-       if (context->getProperty(Protocol.getName(), value))
-       {
-               if (_protocol != value)
-                       needResetServerSocket = true;
-               _protocol = value;
-       }
-       if (context->getProperty(RecvBufSize.getName(), value))
-       {
-               Property::StringToInt(value, _recvBufSize);
-       }
-       if (context->getProperty(MaxSocketBufSize.getName(), value))
-       {
-               Property::StringToInt(value, _maxSocketBufSize);
-       }
-       if (context->getProperty(MaxConnections.getName(), value))
-       {
-               Property::StringToInt(value, _maxConnections);
-       }
-       if (context->getProperty(MessageDelimiter.getName(), value))
-       {
-               _messageDelimiter = value;
-       }
-       if (context->getProperty(ParseMessages.getName(), value))
-       {
-               Property::StringToBool(value, _parseMessages);
-       }
-       if (context->getProperty(Port.getName(), value))
-       {
-               int64_t oldPort = _port;
-               Property::StringToInt(value, _port);
-               if (_port != oldPort)
-                       needResetServerSocket = true;
-       }
-       if (context->getProperty(MaxBatchSize.getName(), value))
-       {
-               Property::StringToInt(value, _maxBatchSize);
-       }
-
-       if (needResetServerSocket)
-               _resetServerSocket = true;
-
-       startSocketThread();
-
-       // read from the event queue
-       if (isEventQueueEmpty())
-       {
-               context->yield();
-               return;
-       }
-
-       std::queue<SysLogEvent> eventQueue;
-       pollEvent(eventQueue, _maxBatchSize);
-       bool firstEvent = true;
-       FlowFileRecord *flowFile = NULL;
-       while(!eventQueue.empty())
-       {
-               SysLogEvent event = eventQueue.front();
-               eventQueue.pop();
-               if (firstEvent)
-               {
-                       flowFile = session->create();
-                       if (!flowFile)
-                               return;
-                       ListenSyslog::WriteCallback callback((char 
*)event.payload, event.len);
-                       session->write(flowFile, &callback);
-                       delete[] event.payload;
-                       firstEvent = false;
-               }
-               else
-               {
-                       ListenSyslog::WriteCallback callback((char 
*)event.payload, event.len);
-                       session->append(flowFile, &callback);
-                       delete[] event.payload;
-               }
-       }
-       flowFile->addAttribute("syslog.protocol", _protocol);
-       flowFile->addAttribute("syslog.port", std::to_string(_port));
-       session->transfer(flowFile, Success);
-}

Reply via email to