http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b02af540/src/LogAttribute.cpp
----------------------------------------------------------------------
diff --git a/src/LogAttribute.cpp b/src/LogAttribute.cpp
deleted file mode 100644
index 82130f8..0000000
--- a/src/LogAttribute.cpp
+++ /dev/null
@@ -1,158 +0,0 @@
-/**
- * @file LogAttribute.cpp
- * LogAttribute class implementation
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#include <vector>
-#include <queue>
-#include <map>
-#include <set>
-#include <sys/time.h>
-#include <time.h>
-#include <sstream>
-#include <string.h>
-#include <iostream>
-
-#include "TimeUtil.h"
-#include "LogAttribute.h"
-#include "ProcessContext.h"
-#include "ProcessSession.h"
-
-const std::string LogAttribute::ProcessorName("LogAttribute");
-Property LogAttribute::LogLevel("Log Level", "The Log Level to use when 
logging the Attributes", "info");
-Property LogAttribute::AttributesToLog("Attributes to Log", "A comma-separated 
list of Attributes to Log. If not specified, all attributes will be logged.", 
"");
-Property LogAttribute::AttributesToIgnore("Attributes to Ignore", "A 
comma-separated list of Attributes to ignore. If not specified, no attributes 
will be ignored.", "");
-Property LogAttribute::LogPayload("Log Payload",
-               "If true, the FlowFile's payload will be logged, in addition to 
its attributes; otherwise, just the Attributes will be logged.", "false");
-Property LogAttribute::LogPrefix("Log prefix",
-               "Log prefix appended to the log lines. It helps to distinguish 
the output of multiple LogAttribute processors.", "");
-Relationship LogAttribute::Success("success", "success operational on the flow 
record");
-
-void LogAttribute::initialize()
-{
-       //! Set the supported properties
-       std::set<Property> properties;
-       properties.insert(LogLevel);
-       properties.insert(AttributesToLog);
-       properties.insert(AttributesToIgnore);
-       properties.insert(LogPayload);
-       properties.insert(LogPrefix);
-       setSupportedProperties(properties);
-       //! Set the supported relationships
-       std::set<Relationship> relationships;
-       relationships.insert(Success);
-       setSupportedRelationships(relationships);
-}
-
-void LogAttribute::onTrigger(ProcessContext *context, ProcessSession *session)
-{
-       std::string dashLine = 
"--------------------------------------------------";
-       LogAttrLevel level = LogAttrLevelInfo;
-       bool logPayload = false;
-       std::ostringstream message;
-
-       FlowFileRecord *flow = session->get();
-
-       if (!flow)
-               return;
-
-       std::string value;
-       if (context->getProperty(LogLevel.getName(), value))
-       {
-               logLevelStringToEnum(value, level);
-       }
-       if (context->getProperty(LogPrefix.getName(), value))
-       {
-               dashLine = "-----" + value + "-----";
-       }
-       if (context->getProperty(LogPayload.getName(), value))
-       {
-               Property::StringToBool(value, logPayload);
-       }
-
-       message << "Logging for flow file " << "\n";
-       message << dashLine;
-       message << "\nStandard FlowFile Attributes";
-       message << "\n" << "UUID:" << flow->getUUIDStr();
-       message << "\n" << "EntryDate:" << getTimeStr(flow->getEntryDate());
-       message << "\n" << "lineageStartDate:" << 
getTimeStr(flow->getlineageStartDate());
-       message << "\n" << "Size:" << flow->getSize() << " Offset:" << 
flow->getOffset();
-       message << "\nFlowFile Attributes Map Content";
-       std::map<std::string, std::string> attrs = flow->getAttributes();
-    std::map<std::string, std::string>::iterator it;
-    for (it = attrs.begin(); it!= attrs.end(); it++)
-    {
-       message << "\n" << "key:" << it->first << " value:" << it->second;
-    }
-    message << "\nFlowFile Resource Claim Content";
-    ResourceClaim *claim = flow->getResourceClaim();
-    if (claim)
-    {
-       message << "\n" << "Content Claim:" << claim->getContentFullPath();
-    }
-    if (logPayload && flow->getSize() <= 1024*1024)
-    {
-       message << "\n" << "Payload:" << "\n";
-       ReadCallback callback(flow->getSize());
-       session->read(flow, &callback);
-       for (unsigned int i = 0, j = 0; i < callback._readSize; i++)
-       {
-               char temp[8];
-               sprintf(temp, "%02x ", (unsigned char) (callback._buffer[i]));
-               message << temp;
-               j++;
-               if (j == 16)
-               {
-                       message << '\n';
-                       j = 0;
-               }
-       }
-    }
-    message << "\n" << dashLine << std::ends;
-    std::string output = message.str();
-
-    switch (level)
-    {
-    case LogAttrLevelInfo:
-       _logger->log_info("%s", output.c_str());
-               break;
-    case LogAttrLevelDebug:
-       _logger->log_debug("%s", output.c_str());
-               break;
-    case LogAttrLevelError:
-       _logger->log_error("%s", output.c_str());
-               break;
-    case LogAttrLevelTrace:
-       _logger->log_trace("%s", output.c_str());
-       break;
-    case LogAttrLevelWarn:
-       _logger->log_warn("%s", output.c_str());
-       break;
-    default:
-       break;
-    }
-
-    // Test Import
-    /*
-    FlowFileRecord *importRecord = session->create();
-    session->import(claim->getContentFullPath(), importRecord);
-    session->transfer(importRecord, Success); */
-
-
-    // Transfer to the relationship
-    session->transfer(flow, Success);
-}

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b02af540/src/Logger.cpp
----------------------------------------------------------------------
diff --git a/src/Logger.cpp b/src/Logger.cpp
deleted file mode 100644
index 984f609..0000000
--- a/src/Logger.cpp
+++ /dev/null
@@ -1,27 +0,0 @@
-/**
- * @file Logger.cpp
- * Logger class implementation
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#include <vector>
-#include <queue>
-#include <map>
-
-#include "Logger.h"
-
-Logger *Logger::_logger(NULL);
-

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b02af540/src/ProcessGroup.cpp
----------------------------------------------------------------------
diff --git a/src/ProcessGroup.cpp b/src/ProcessGroup.cpp
deleted file mode 100644
index 70ee9d7..0000000
--- a/src/ProcessGroup.cpp
+++ /dev/null
@@ -1,314 +0,0 @@
-/**
- * @file ProcessGroup.cpp
- * ProcessGroup class implementation
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#include <vector>
-#include <queue>
-#include <map>
-#include <set>
-#include <sys/time.h>
-#include <time.h>
-#include <chrono>
-#include <thread>
-
-#include "ProcessGroup.h"
-#include "Processor.h"
-
-ProcessGroup::ProcessGroup(ProcessGroupType type, std::string name, uuid_t 
uuid, ProcessGroup *parent)
-: _name(name),
-  _type(type),
-  _parentProcessGroup(parent)
-{
-       if (!uuid)
-               // Generate the global UUID for the flow record
-               uuid_generate(_uuid);
-       else
-               uuid_copy(_uuid, uuid);
-
-       _yieldPeriodMsec = 0;
-       _transmitting = false;
-
-       _logger = Logger::getLogger();
-       _logger->log_info("ProcessGroup %s created", _name.c_str());
-}
-
-ProcessGroup::~ProcessGroup()
-{
-       for (std::set<Connection *>::iterator it = _connections.begin(); it != 
_connections.end(); ++it)
-       {
-               Connection *connection = *it;
-               connection->drain();
-               delete connection;
-       }
-
-       for (std::set<ProcessGroup *>::iterator it = 
_childProcessGroups.begin(); it != _childProcessGroups.end(); ++it)
-       {
-               ProcessGroup *processGroup(*it);
-               delete processGroup;
-       }
-
-       for (std::set<Processor *>::iterator it = _processors.begin(); it != 
_processors.end(); ++it)
-       {
-               Processor *processor(*it);
-               delete processor;
-       }
-}
-
-bool ProcessGroup::isRootProcessGroup()
-{
-       std::lock_guard<std::mutex> lock(_mtx);
-       return (_type == ROOT_PROCESS_GROUP);
-}
-
-void ProcessGroup::addProcessor(Processor *processor)
-{
-       std::lock_guard<std::mutex> lock(_mtx);
-
-       if (_processors.find(processor) == _processors.end())
-       {
-               // We do not have the same processor in this process group yet
-               _processors.insert(processor);
-               _logger->log_info("Add processor %s into process group %s",
-                               processor->getName().c_str(), _name.c_str());
-       }
-}
-
-void ProcessGroup::removeProcessor(Processor *processor)
-{
-       std::lock_guard<std::mutex> lock(_mtx);
-
-       if (_processors.find(processor) != _processors.end())
-       {
-               // We do have the same processor in this process group yet
-               _processors.erase(processor);
-               _logger->log_info("Remove processor %s from process group %s",
-                               processor->getName().c_str(), _name.c_str());
-       }
-}
-
-void ProcessGroup::addProcessGroup(ProcessGroup *child)
-{
-       std::lock_guard<std::mutex> lock(_mtx);
-
-       if (_childProcessGroups.find(child) == _childProcessGroups.end())
-       {
-               // We do not have the same child process group in this process 
group yet
-               _childProcessGroups.insert(child);
-               _logger->log_info("Add child process group %s into process 
group %s",
-                               child->getName().c_str(), _name.c_str());
-       }
-}
-
-void ProcessGroup::removeProcessGroup(ProcessGroup *child)
-{
-       std::lock_guard<std::mutex> lock(_mtx);
-
-       if (_childProcessGroups.find(child) != _childProcessGroups.end())
-       {
-               // We do have the same child process group in this process 
group yet
-               _childProcessGroups.erase(child);
-               _logger->log_info("Remove child process group %s from process 
group %s",
-                               child->getName().c_str(), _name.c_str());
-       }
-}
-
-void ProcessGroup::startProcessing(TimerDrivenSchedulingAgent *timeScheduler)
-{
-       std::lock_guard<std::mutex> lock(_mtx);
-
-       try
-       {
-               // Start all the processor node, input and output ports
-               for (std::set<Processor *>::iterator it = _processors.begin(); 
it != _processors.end(); ++it)
-               {
-                       Processor *processor(*it);
-                       if (!processor->isRunning() && 
processor->getScheduledState() != DISABLED)
-                       {
-                               if (processor->getSchedulingStrategy() == 
TIMER_DRIVEN)
-                                       timeScheduler->schedule(processor);
-                       }
-               }
-
-               for (std::set<ProcessGroup *>::iterator it = 
_childProcessGroups.begin(); it != _childProcessGroups.end(); ++it)
-               {
-                       ProcessGroup *processGroup(*it);
-                       processGroup->startProcessing(timeScheduler);
-               }
-       }
-       catch (std::exception &exception)
-       {
-               _logger->log_debug("Caught Exception %s", exception.what());
-               throw;
-       }
-       catch (...)
-       {
-               _logger->log_debug("Caught Exception during process group start 
processing");
-               throw;
-       }
-}
-
-void ProcessGroup::stopProcessing(TimerDrivenSchedulingAgent *timeScheduler)
-{
-       std::lock_guard<std::mutex> lock(_mtx);
-
-       try
-       {
-               // Stop all the processor node, input and output ports
-               for (std::set<Processor *>::iterator it = _processors.begin(); 
it != _processors.end(); ++it)
-               {
-                       Processor *processor(*it);
-                       if (processor->getSchedulingStrategy() == TIMER_DRIVEN)
-                                       timeScheduler->unschedule(processor);
-               }
-
-               for (std::set<ProcessGroup *>::iterator it = 
_childProcessGroups.begin(); it != _childProcessGroups.end(); ++it)
-               {
-                       ProcessGroup *processGroup(*it);
-                       processGroup->stopProcessing(timeScheduler);
-               }
-       }
-       catch (std::exception &exception)
-       {
-               _logger->log_debug("Caught Exception %s", exception.what());
-               throw;
-       }
-       catch (...)
-       {
-               _logger->log_debug("Caught Exception during process group stop 
processing");
-               throw;
-       }
-}
-
-Processor *ProcessGroup::findProcessor(uuid_t uuid)
-{
-       Processor *ret = NULL;
-       // std::lock_guard<std::mutex> lock(_mtx);
-
-       for (std::set<Processor *>::iterator it = _processors.begin(); it != 
_processors.end(); ++it)
-       {
-               Processor *processor(*it);
-               uuid_t processorUUID;
-               if (processor->getUUID(processorUUID) && 
uuid_compare(processorUUID, uuid) == 0)
-                       return processor;
-       }
-
-       for (std::set<ProcessGroup *>::iterator it = 
_childProcessGroups.begin(); it != _childProcessGroups.end(); ++it)
-       {
-               ProcessGroup *processGroup(*it);
-               Processor *processor = processGroup->findProcessor(uuid);
-               if (processor)
-                       return processor;
-       }
-
-       return ret;
-}
-
-Processor *ProcessGroup::findProcessor(std::string processorName)
-{
-       Processor *ret = NULL;
-
-       for (std::set<Processor *>::iterator it = _processors.begin(); it != 
_processors.end(); ++it)
-       {
-               Processor *processor(*it);
-               _logger->log_debug("Current processor is %s", 
processor->getName().c_str());
-               if (processor->getName() == processorName)
-                       return processor;
-       }
-
-       for (std::set<ProcessGroup *>::iterator it = 
_childProcessGroups.begin(); it != _childProcessGroups.end(); ++it)
-       {
-               ProcessGroup *processGroup(*it);
-               Processor *processor = 
processGroup->findProcessor(processorName);
-               if (processor)
-                       return processor;
-       }
-
-       return ret;
-}
-
-void ProcessGroup::updatePropertyValue(std::string processorName, std::string 
propertyName, std::string propertyValue)
-{
-       std::lock_guard<std::mutex> lock(_mtx);
-
-       for (std::set<Processor *>::iterator it = _processors.begin(); it != 
_processors.end(); ++it)
-       {
-               Processor *processor(*it);
-               if (processor->getName() == processorName)
-               {
-                       processor->setProperty(propertyName, propertyValue);
-               }
-       }
-
-       for (std::set<ProcessGroup *>::iterator it = 
_childProcessGroups.begin(); it != _childProcessGroups.end(); ++it)
-       {
-               ProcessGroup *processGroup(*it);
-               processGroup->updatePropertyValue(processorName, propertyName, 
propertyValue);
-       }
-
-       return;
-}
-
-void ProcessGroup::addConnection(Connection *connection)
-{
-       std::lock_guard<std::mutex> lock(_mtx);
-
-       if (_connections.find(connection) == _connections.end())
-       {
-               // We do not have the same connection in this process group yet
-               _connections.insert(connection);
-               _logger->log_info("Add connection %s into process group %s",
-                               connection->getName().c_str(), _name.c_str());
-               uuid_t sourceUUID;
-               Processor *source = NULL;
-               connection->getSourceProcessorUUID(sourceUUID);
-               source = this->findProcessor(sourceUUID);
-               if (source)
-                       source->addConnection(connection);
-               Processor *destination = NULL;
-               uuid_t destinationUUID;
-               connection->getDestinationProcessorUUID(destinationUUID);
-               destination = this->findProcessor(destinationUUID);
-               if (destination && destination != source)
-                       destination->addConnection(connection);
-       }
-}
-
-void ProcessGroup::removeConnection(Connection *connection)
-{
-       std::lock_guard<std::mutex> lock(_mtx);
-
-       if (_connections.find(connection) != _connections.end())
-       {
-               // We do not have the same connection in this process group yet
-               _connections.erase(connection);
-               _logger->log_info("Remove connection %s into process group %s",
-                               connection->getName().c_str(), _name.c_str());
-               uuid_t sourceUUID;
-               Processor *source = NULL;
-               connection->getSourceProcessorUUID(sourceUUID);
-               source = this->findProcessor(sourceUUID);
-               if (source)
-                       source->removeConnection(connection);
-               Processor *destination = NULL;
-               uuid_t destinationUUID;
-               connection->getDestinationProcessorUUID(destinationUUID);
-               destination = this->findProcessor(destinationUUID);
-               if (destination && destination != source)
-                       destination->removeConnection(connection);
-       }
-}

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b02af540/src/ProcessSession.cpp
----------------------------------------------------------------------
diff --git a/src/ProcessSession.cpp b/src/ProcessSession.cpp
deleted file mode 100644
index 4f526c3..0000000
--- a/src/ProcessSession.cpp
+++ /dev/null
@@ -1,731 +0,0 @@
-/**
- * @file ProcessSession.cpp
- * ProcessSession class implementation
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#include <vector>
-#include <queue>
-#include <map>
-#include <set>
-#include <sys/time.h>
-#include <time.h>
-#include <chrono>
-#include <thread>
-#include <iostream>
-
-#include "ProcessSession.h"
-
-FlowFileRecord* ProcessSession::create()
-{
-       std::map<std::string, std::string> empty;
-       FlowFileRecord *record = new FlowFileRecord(empty);
-
-       if (record)
-       {
-               _addedFlowFiles[record->getUUIDStr()] = record;
-               _logger->log_debug("Create FlowFile with UUID %s", 
record->getUUIDStr().c_str());
-       }
-
-       return record;
-}
-
-FlowFileRecord* ProcessSession::create(FlowFileRecord *parent)
-{
-       FlowFileRecord *record = this->create();
-       if (record)
-       {
-               // Copy attributes
-               std::map<std::string, std::string> parentAttributes = 
parent->getAttributes();
-           std::map<std::string, std::string>::iterator it;
-           for (it = parentAttributes.begin(); it!= parentAttributes.end(); 
it++)
-           {
-               if (it->first == FlowAttributeKey(ALTERNATE_IDENTIFIER) ||
-                               it->first == FlowAttributeKey(DISCARD_REASON) ||
-                                       it->first == FlowAttributeKey(UUID))
-                       // Do not copy special attributes from parent
-                       continue;
-               record->setAttribute(it->first, it->second);
-           }
-           record->_lineageStartDate = parent->_lineageStartDate;
-           record->_lineageIdentifiers = parent->_lineageIdentifiers;
-           record->_lineageIdentifiers.insert(parent->_uuidStr);
-
-       }
-       return record;
-}
-
-FlowFileRecord* ProcessSession::clone(FlowFileRecord *parent)
-{
-       FlowFileRecord *record = this->create(parent);
-       if (record)
-       {
-               // Copy Resource Claim
-               record->_claim = parent->_claim;
-               if (record->_claim)
-               {
-                       record->_offset = parent->_offset;
-                       record->_size = parent->_size;
-                       record->_claim->increaseFlowFileRecordOwnedCount();
-               }
-       }
-       return record;
-}
-
-FlowFileRecord* ProcessSession::cloneDuringTransfer(FlowFileRecord *parent)
-{
-       std::map<std::string, std::string> empty;
-       FlowFileRecord *record = new FlowFileRecord(empty);
-
-       if (record)
-       {
-               this->_clonedFlowFiles[record->getUUIDStr()] = record;
-               _logger->log_debug("Clone FlowFile with UUID %s during 
transfer", record->getUUIDStr().c_str());
-               // Copy attributes
-               std::map<std::string, std::string> parentAttributes = 
parent->getAttributes();
-               std::map<std::string, std::string>::iterator it;
-               for (it = parentAttributes.begin(); it!= 
parentAttributes.end(); it++)
-               {
-                       if (it->first == FlowAttributeKey(ALTERNATE_IDENTIFIER) 
||
-                               it->first == FlowAttributeKey(DISCARD_REASON) ||
-                                       it->first == FlowAttributeKey(UUID))
-                       // Do not copy special attributes from parent
-                       continue;
-               record->setAttribute(it->first, it->second);
-           }
-           record->_lineageStartDate = parent->_lineageStartDate;
-           record->_lineageIdentifiers = parent->_lineageIdentifiers;
-           record->_lineageIdentifiers.insert(parent->_uuidStr);
-
-           // Copy Resource Claim
-           record->_claim = parent->_claim;
-           if (record->_claim)
-           {
-               record->_offset = parent->_offset;
-               record->_size = parent->_size;
-               record->_claim->increaseFlowFileRecordOwnedCount();
-           }
-       }
-
-       return record;
-}
-
-FlowFileRecord* ProcessSession::clone(FlowFileRecord *parent, long offset, 
long size)
-{
-       FlowFileRecord *record = this->create(parent);
-       if (record)
-       {
-               if (parent->_claim)
-               {
-                       if ((offset + size) > (long) parent->_size)
-                       {
-                               // Set offset and size
-                               _logger->log_error("clone offset %d and size %d 
exceed parent size %d",
-                                               offset, size, parent->_size);
-                               // Remove the Add FlowFile for the session
-                               std::map<std::string, FlowFileRecord 
*>::iterator it =
-                                               
this->_addedFlowFiles.find(record->getUUIDStr());
-                               if (it != this->_addedFlowFiles.end())
-                                       
this->_addedFlowFiles.erase(record->getUUIDStr());
-                               delete record;
-                               return NULL;
-                       }
-                       record->_offset = parent->_offset + parent->_offset;
-                       record->_size = size;
-                       // Copy Resource Claim
-                       record->_claim = parent->_claim;
-                       record->_claim->increaseFlowFileRecordOwnedCount();
-               }
-       }
-       return record;
-}
-
-void ProcessSession::remove(FlowFileRecord *flow)
-{
-       flow->_markedDelete = true;
-       _deletedFlowFiles[flow->getUUIDStr()] = flow;
-}
-
-void ProcessSession::putAttribute(FlowFileRecord *flow, std::string key, 
std::string value)
-{
-       flow->setAttribute(key, value);
-}
-
-void ProcessSession::removeAttribute(FlowFileRecord *flow, std::string key)
-{
-       flow->removeAttribute(key);
-}
-
-void ProcessSession::penalize(FlowFileRecord *flow)
-{
-       flow->_penaltyExpirationMs = getTimeMillis() + 
this->_processContext->getProcessor()->getPenalizationPeriodMsec();
-}
-
-void ProcessSession::transfer(FlowFileRecord *flow, Relationship relationship)
-{
-       _transferRelationship[flow->getUUIDStr()] = relationship;
-}
-
-void ProcessSession::write(FlowFileRecord *flow, OutputStreamCallback 
*callback)
-{
-       ResourceClaim *claim = NULL;
-
-       claim = new ResourceClaim(DEFAULT_CONTENT_DIRECTORY);
-
-       try
-       {
-               std::ofstream fs;
-               fs.open(claim->getContentFullPath().c_str(), std::fstream::out 
| std::fstream::binary | std::fstream::trunc);
-               if (fs.is_open())
-               {
-                       // Call the callback to write the content
-                       callback->process(&fs);
-                       if (fs.good() && fs.tellp() >= 0)
-                       {
-                               flow->_size = fs.tellp();
-                               flow->_offset = 0;
-                               if (flow->_claim)
-                               {
-                                       // Remove the old claim
-                                       
flow->_claim->decreaseFlowFileRecordOwnedCount();
-                                       flow->_claim = NULL;
-                               }
-                               flow->_claim = claim;
-                               claim->increaseFlowFileRecordOwnedCount();
-                               /*
-                               _logger->log_debug("Write offset %d length %d 
into content %s for FlowFile UUID %s",
-                                               flow->_offset, flow->_size, 
flow->_claim->getContentFullPath().c_str(), flow->getUUIDStr().c_str()); */
-                               fs.close();
-                       }
-                       else
-                       {
-                               fs.close();
-                               throw Exception(FILE_OPERATION_EXCEPTION, "File 
Write Error");
-                       }
-               }
-               else
-               {
-                       throw Exception(FILE_OPERATION_EXCEPTION, "File Open 
Error");
-               }
-       }
-       catch (std::exception &exception)
-       {
-               if (flow && flow->_claim == claim)
-               {
-                       flow->_claim->decreaseFlowFileRecordOwnedCount();
-                       flow->_claim = NULL;
-               }
-               if (claim)
-                       delete claim;
-               _logger->log_debug("Caught Exception %s", exception.what());
-               throw;
-       }
-       catch (...)
-       {
-               if (flow && flow->_claim == claim)
-               {
-                       flow->_claim->decreaseFlowFileRecordOwnedCount();
-                       flow->_claim = NULL;
-               }
-               if (claim)
-                       delete claim;
-               _logger->log_debug("Caught Exception during process session 
write");
-               throw;
-       }
-}
-
-void ProcessSession::append(FlowFileRecord *flow, OutputStreamCallback 
*callback)
-{
-       ResourceClaim *claim = NULL;
-
-       if (flow->_claim == NULL)
-       {
-               // No existed claim for append, we need to create new claim
-               return write(flow, callback);
-       }
-
-       claim = flow->_claim;
-
-       try
-       {
-               std::ofstream fs;
-               fs.open(claim->getContentFullPath().c_str(), std::fstream::out 
| std::fstream::binary | std::fstream::app);
-               if (fs.is_open())
-               {
-                       // Call the callback to write the content
-                       std::streampos oldPos = fs.tellp();
-                       callback->process(&fs);
-                       if (fs.good() && fs.tellp() >= 0)
-                       {
-                               uint64_t appendSize = fs.tellp() - oldPos;
-                               flow->_size += appendSize;
-                               /*
-                               _logger->log_debug("Append offset %d extra 
length %d to new size %d into content %s for FlowFile UUID %s",
-                                               flow->_offset, appendSize, 
flow->_size, claim->getContentFullPath().c_str(), flow->getUUIDStr().c_str()); 
*/
-                               fs.close();
-                       }
-                       else
-                       {
-                               fs.close();
-                               throw Exception(FILE_OPERATION_EXCEPTION, "File 
Write Error");
-                       }
-               }
-               else
-               {
-                       throw Exception(FILE_OPERATION_EXCEPTION, "File Open 
Error");
-               }
-       }
-       catch (std::exception &exception)
-       {
-               _logger->log_debug("Caught Exception %s", exception.what());
-               throw;
-       }
-       catch (...)
-       {
-               _logger->log_debug("Caught Exception during process session 
append");
-               throw;
-       }
-}
-
-void ProcessSession::read(FlowFileRecord *flow, InputStreamCallback *callback)
-{
-       try
-       {
-               ResourceClaim *claim = NULL;
-               if (flow->_claim == NULL)
-               {
-                       // No existed claim for read, we throw exception
-                       throw Exception(FILE_OPERATION_EXCEPTION, "No Content 
Claim existed for read");
-               }
-
-               claim = flow->_claim;
-               std::ifstream fs;
-               fs.open(claim->getContentFullPath().c_str(), std::fstream::in | 
std::fstream::binary);
-               if (fs.is_open())
-               {
-                       fs.seekg(flow->_offset, fs.beg);
-
-                       if (fs.good())
-                       {
-                               callback->process(&fs);
-                               /*
-                               _logger->log_debug("Read offset %d size %d 
content %s for FlowFile UUID %s",
-                                               flow->_offset, flow->_size, 
claim->getContentFullPath().c_str(), flow->getUUIDStr().c_str()); */
-                               fs.close();
-                       }
-                       else
-                       {
-                               fs.close();
-                               throw Exception(FILE_OPERATION_EXCEPTION, "File 
Read Error");
-                       }
-               }
-               else
-               {
-                       throw Exception(FILE_OPERATION_EXCEPTION, "File Open 
Error");
-               }
-       }
-       catch (std::exception &exception)
-       {
-               _logger->log_debug("Caught Exception %s", exception.what());
-               throw;
-       }
-       catch (...)
-       {
-               _logger->log_debug("Caught Exception during process session 
read");
-               throw;
-       }
-}
-
-void ProcessSession::import(std::string source, FlowFileRecord *flow, bool 
keepSource, uint64_t offset)
-{
-       ResourceClaim *claim = NULL;
-
-       claim = new ResourceClaim(DEFAULT_CONTENT_DIRECTORY);
-       char *buf = NULL;
-       int size = 4096;
-       buf = new char [size];
-
-       try
-       {
-               std::ofstream fs;
-               fs.open(claim->getContentFullPath().c_str(), std::fstream::out 
| std::fstream::binary | std::fstream::trunc);
-               std::ifstream input;
-               input.open(source.c_str(), std::fstream::in | 
std::fstream::binary);
-
-               if (fs.is_open() && input.is_open())
-               {
-                       // Open the source file and stream to the flow file
-                       input.seekg(offset, fs.beg);
-                       while (input.good())
-                       {
-                               input.read(buf, size);
-                               if (input)
-                                       fs.write(buf, size);
-                               else
-                                       fs.write(buf, input.gcount());
-                       }
-
-                       if (fs.good() && fs.tellp() >= 0)
-                       {
-                               flow->_size = fs.tellp();
-                               flow->_offset = 0;
-                               if (flow->_claim)
-                               {
-                                       // Remove the old claim
-                                       
flow->_claim->decreaseFlowFileRecordOwnedCount();
-                                       flow->_claim = NULL;
-                               }
-                               flow->_claim = claim;
-                               claim->increaseFlowFileRecordOwnedCount();
-                               /*
-                               _logger->log_debug("Import offset %d length %d 
into content %s for FlowFile UUID %s",
-                                               flow->_offset, flow->_size, 
flow->_claim->getContentFullPath().c_str(), flow->getUUIDStr().c_str()); */
-                               fs.close();
-                               input.close();
-                               if (!keepSource)
-                                       std::remove(source.c_str());
-                       }
-                       else
-                       {
-                               fs.close();
-                               input.close();
-                               throw Exception(FILE_OPERATION_EXCEPTION, "File 
Import Error");
-                       }
-               }
-               else
-               {
-                       throw Exception(FILE_OPERATION_EXCEPTION, "File Import 
Error");
-               }
-
-               delete[] buf;
-       }
-       catch (std::exception &exception)
-       {
-               if (flow && flow->_claim == claim)
-               {
-                       flow->_claim->decreaseFlowFileRecordOwnedCount();
-                       flow->_claim = NULL;
-               }
-               if (claim)
-                       delete claim;
-               _logger->log_debug("Caught Exception %s", exception.what());
-               delete[] buf;
-               throw;
-       }
-       catch (...)
-       {
-               if (flow && flow->_claim == claim)
-               {
-                       flow->_claim->decreaseFlowFileRecordOwnedCount();
-                       flow->_claim = NULL;
-               }
-               if (claim)
-                       delete claim;
-               _logger->log_debug("Caught Exception during process session 
write");
-               delete[] buf;
-               throw;
-       }
-}
-
-void ProcessSession::commit()
-{
-       try
-       {
-               // First we clone the flow record based on the transfered 
relationship for updated flow record
-               std::map<std::string, FlowFileRecord *>::iterator it;
-               for (it = _updatedFlowFiles.begin(); it!= 
_updatedFlowFiles.end(); it++)
-               {
-                       FlowFileRecord *record = it->second;
-                       if (record->_markedDelete)
-                               continue;
-                       std::map<std::string, Relationship>::iterator 
itRelationship =
-                                       
this->_transferRelationship.find(record->getUUIDStr());
-                       if (itRelationship != _transferRelationship.end())
-                       {
-                               Relationship relationship = 
itRelationship->second;
-                               // Find the relationship, we need to find the 
connections for that relationship
-                               std::set<Connection *> connections =
-                                               
_processContext->getProcessor()->getOutGoingConnections(relationship.getName());
-                               if (connections.empty())
-                               {
-                                       // No connection
-                                       if 
(!_processContext->getProcessor()->isAutoTerminated(relationship))
-                                       {
-                                               // Not autoterminate, we should 
have the connect
-                                               std::string message = "Connect 
empty for non auto terminated relationship" + relationship.getName();
-                                               throw 
Exception(PROCESS_SESSION_EXCEPTION, message.c_str());
-                                       }
-                                       else
-                                       {
-                                               // Autoterminated
-                                               remove(record);
-                                       }
-                               }
-                               else
-                               {
-                                       // We connections, clone the flow and 
assign the connection accordingly
-                                       for (std::set<Connection *>::iterator 
itConnection = connections.begin(); itConnection != connections.end(); 
++itConnection)
-                                       {
-                                               Connection 
*connection(*itConnection);
-                                               if (itConnection == 
connections.begin())
-                                               {
-                                                       // First connection 
which the flow need be routed to
-                                                       record->_connection = 
connection;
-                                               }
-                                               else
-                                               {
-                                                       // Clone the flow file 
and route to the connection
-                                                       FlowFileRecord 
*cloneRecord;
-                                                       cloneRecord = 
this->cloneDuringTransfer(record);
-                                                       if (cloneRecord)
-                                                               
cloneRecord->_connection = connection;
-                                                       else
-                                                               throw 
Exception(PROCESS_SESSION_EXCEPTION, "Can not clone the flow for transfer");
-                                               }
-                                       }
-                               }
-                       }
-                       else
-                       {
-                               // Can not find relationship for the flow
-                               throw Exception(PROCESS_SESSION_EXCEPTION, "Can 
not find the transfer relationship for the flow");
-                       }
-               }
-
-               // Do the samething for added flow file
-               for (it = _addedFlowFiles.begin(); it!= _addedFlowFiles.end(); 
it++)
-               {
-                       FlowFileRecord *record = it->second;
-                       if (record->_markedDelete)
-                               continue;
-                       std::map<std::string, Relationship>::iterator 
itRelationship =
-                                       
this->_transferRelationship.find(record->getUUIDStr());
-                       if (itRelationship != _transferRelationship.end())
-                       {
-                               Relationship relationship = 
itRelationship->second;
-                               // Find the relationship, we need to find the 
connections for that relationship
-                               std::set<Connection *> connections =
-                                               
_processContext->getProcessor()->getOutGoingConnections(relationship.getName());
-                               if (connections.empty())
-                               {
-                                       // No connection
-                                       if 
(!_processContext->getProcessor()->isAutoTerminated(relationship))
-                                       {
-                                               // Not autoterminate, we should 
have the connect
-                                               std::string message = "Connect 
empty for non auto terminated relationship " + relationship.getName();
-                                               throw 
Exception(PROCESS_SESSION_EXCEPTION, message.c_str());
-                                       }
-                                       else
-                                       {
-                                               // Autoterminated
-                                               remove(record);
-                                       }
-                               }
-                               else
-                               {
-                                       // We connections, clone the flow and 
assign the connection accordingly
-                                       for (std::set<Connection *>::iterator 
itConnection = connections.begin(); itConnection != connections.end(); 
++itConnection)
-                                       {
-                                               Connection 
*connection(*itConnection);
-                                               if (itConnection == 
connections.begin())
-                                               {
-                                                       // First connection 
which the flow need be routed to
-                                                       record->_connection = 
connection;
-                                               }
-                                               else
-                                               {
-                                                       // Clone the flow file 
and route to the connection
-                                                       FlowFileRecord 
*cloneRecord;
-                                                       cloneRecord = 
this->cloneDuringTransfer(record);
-                                                       if (cloneRecord)
-                                                               
cloneRecord->_connection = connection;
-                                                       else
-                                                               throw 
Exception(PROCESS_SESSION_EXCEPTION, "Can not clone the flow for transfer");
-                                               }
-                                       }
-                               }
-                       }
-                       else
-                       {
-                               // Can not find relationship for the flow
-                               throw Exception(PROCESS_SESSION_EXCEPTION, "Can 
not find the transfer relationship for the flow");
-                       }
-               }
-
-               // Complete process the added and update flow files for the 
session, send the flow file to its queue
-               for (it = _updatedFlowFiles.begin(); it!= 
_updatedFlowFiles.end(); it++)
-               {
-                       FlowFileRecord *record = it->second;
-                       if (record->_markedDelete)
-                       {
-                               continue;
-                       }
-                       if (record->_connection)
-                               record->_connection->put(record);
-                       else
-                               delete record;
-               }
-               for (it = _addedFlowFiles.begin(); it!= _addedFlowFiles.end(); 
it++)
-               {
-                       FlowFileRecord *record = it->second;
-                       if (record->_markedDelete)
-                       {
-                               continue;
-                       }
-                       if (record->_connection)
-                               record->_connection->put(record);
-                       else
-                               delete record;
-               }
-               // Process the clone flow files
-               for (it = _clonedFlowFiles.begin(); it!= 
_clonedFlowFiles.end(); it++)
-               {
-                       FlowFileRecord *record = it->second;
-                       if (record->_markedDelete)
-                       {
-                               continue;
-                       }
-                       if (record->_connection)
-                               record->_connection->put(record);
-                       else
-                               delete record;
-               }
-               // Delete the deleted flow files
-               for (it = _deletedFlowFiles.begin(); it!= 
_deletedFlowFiles.end(); it++)
-               {
-                       FlowFileRecord *record = it->second;
-                       delete record;
-               }
-               // Delete the snapshot
-               for (it = _originalFlowFiles.begin(); it!= 
_originalFlowFiles.end(); it++)
-               {
-                       FlowFileRecord *record = it->second;
-                       delete record;
-               }
-               // All done
-               _updatedFlowFiles.clear();
-               _addedFlowFiles.clear();
-               _clonedFlowFiles.clear();
-               _deletedFlowFiles.clear();
-               _originalFlowFiles.clear();
-               _logger->log_trace("ProcessSession committed for %s", 
_processContext->getProcessor()->getName().c_str());
-       }
-       catch (std::exception &exception)
-       {
-               _logger->log_debug("Caught Exception %s", exception.what());
-               throw;
-       }
-       catch (...)
-       {
-               _logger->log_debug("Caught Exception during process session 
commit");
-               throw;
-       }
-}
-
-
-void ProcessSession::rollback()
-{
-       try
-       {
-               std::map<std::string, FlowFileRecord *>::iterator it;
-               // Requeue the snapshot of the flowfile back
-               for (it = _originalFlowFiles.begin(); it!= 
_originalFlowFiles.end(); it++)
-               {
-                       FlowFileRecord *record = it->second;
-                       if (record->_orginalConnection)
-                       {
-                               record->_snapshot = false;
-                               record->_orginalConnection->put(record);
-                       }
-                       else
-                               delete record;
-               }
-               _originalFlowFiles.clear();
-               // Process the clone flow files
-               for (it = _clonedFlowFiles.begin(); it!= 
_clonedFlowFiles.end(); it++)
-               {
-                       FlowFileRecord *record = it->second;
-                       delete record;
-               }
-               _clonedFlowFiles.clear();
-               for (it = _addedFlowFiles.begin(); it!= _addedFlowFiles.end(); 
it++)
-               {
-                       FlowFileRecord *record = it->second;
-                       delete record;
-               }
-               _addedFlowFiles.clear();
-               for (it = _updatedFlowFiles.begin(); it!= 
_updatedFlowFiles.end(); it++)
-               {
-                       FlowFileRecord *record = it->second;
-                       delete record;
-               }
-               _updatedFlowFiles.clear();
-               _deletedFlowFiles.clear();
-               _logger->log_trace("ProcessSession rollback for %s", 
_processContext->getProcessor()->getName().c_str());
-       }
-       catch (std::exception &exception)
-       {
-               _logger->log_debug("Caught Exception %s", exception.what());
-               throw;
-       }
-       catch (...)
-       {
-               _logger->log_debug("Caught Exception during process session 
roll back");
-               throw;
-       }
-}
-
-FlowFileRecord *ProcessSession::get()
-{
-       Connection *first = 
_processContext->getProcessor()->getNextIncomingConnection();
-
-       if (first == NULL)
-               return NULL;
-
-       Connection *current = first;
-
-       do
-       {
-               std::set<FlowFileRecord *> expired;
-               FlowFileRecord *ret = current->poll(expired);
-               if (expired.size() > 0)
-               {
-                       // Remove expired flow record
-                       for (std::set<FlowFileRecord *>::iterator it = 
expired.begin(); it != expired.end(); ++it)
-                       {
-                               delete (*it);
-                       }
-               }
-               if (ret)
-               {
-                       // add the flow record to the current process session 
update map
-                       ret->_markedDelete = false;
-                       _updatedFlowFiles[ret->getUUIDStr()] = ret;
-                       std::map<std::string, std::string> empty;
-                       FlowFileRecord *snapshot = new FlowFileRecord(empty);
-                       _logger->log_debug("Create Snapshot FlowFile with UUID 
%s", snapshot->getUUIDStr().c_str());
-                       snapshot->duplicate(ret);
-                       // save a snapshot
-                       _originalFlowFiles[snapshot->getUUIDStr()] = snapshot;
-                       return ret;
-               }
-               current = 
_processContext->getProcessor()->getNextIncomingConnection();
-       }
-       while (current != NULL && current != first);
-
-       return NULL;
-}
-

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b02af540/src/Processor.cpp
----------------------------------------------------------------------
diff --git a/src/Processor.cpp b/src/Processor.cpp
deleted file mode 100644
index cc136dc..0000000
--- a/src/Processor.cpp
+++ /dev/null
@@ -1,451 +0,0 @@
-/**
- * @file Processor.cpp
- * Processor class implementation
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#include <vector>
-#include <queue>
-#include <map>
-#include <set>
-#include <sys/time.h>
-#include <time.h>
-#include <chrono>
-#include <thread>
-
-#include "Processor.h"
-#include "ProcessContext.h"
-#include "ProcessSession.h"
-
-Processor::Processor(std::string name, uuid_t uuid)
-: _name(name)
-{
-       if (!uuid)
-               // Generate the global UUID for the flow record
-               uuid_generate(_uuid);
-       else
-               uuid_copy(_uuid, uuid);
-
-       char uuidStr[37];
-       uuid_unparse(_uuid, uuidStr);
-       _uuidStr = uuidStr;
-
-       // Setup the default values
-       _state = DISABLED;
-       _strategy = TIMER_DRIVEN;
-       _lossTolerant = false;
-       _triggerWhenEmpty = false;
-       _schedulingPeriodNano = MINIMUM_SCHEDULING_NANOS;
-       _runDurantionNano = 0;
-       _yieldPeriodMsec = DEFAULT_YIELD_PERIOD_SECONDS * 1000;
-       _penalizationPeriodMsec = DEFAULT_PENALIZATION_PERIOD_SECONDS * 1000;
-       _maxConcurrentTasks = 1;
-       _activeTasks = 0;
-       _yieldExpiration = 0;
-       _incomingConnectionsIter = this->_incomingConnections.begin();
-       _logger = Logger::getLogger();
-
-       _logger->log_info("Processor %s created UUID %s", _name.c_str(), 
_uuidStr.c_str());
-}
-
-Processor::~Processor()
-{
-
-}
-
-bool Processor::isRunning()
-{
-       return (_state == RUNNING && _activeTasks > 0);
-}
-
-bool Processor::setSupportedProperties(std::set<Property> properties)
-{
-       if (isRunning())
-       {
-               _logger->log_info("Can not set processor property while the 
process %s is running",
-                               _name.c_str());
-               return false;
-       }
-
-       std::lock_guard<std::mutex> lock(_mtx);
-
-       _properties.clear();
-       for (std::set<Property>::iterator it = properties.begin(); it != 
properties.end(); ++it)
-       {
-               Property item(*it);
-               _properties[item.getName()] = item;
-               _logger->log_info("Processor %s supported property name %s", 
_name.c_str(), item.getName().c_str());
-       }
-
-       return true;
-}
-
-bool Processor::setSupportedRelationships(std::set<Relationship> relationships)
-{
-       if (isRunning())
-       {
-               _logger->log_info("Can not set processor supported relationship 
while the process %s is running",
-                               _name.c_str());
-               return false;
-       }
-
-       std::lock_guard<std::mutex> lock(_mtx);
-
-       _relationships.clear();
-       for (std::set<Relationship>::iterator it = relationships.begin(); it != 
relationships.end(); ++it)
-       {
-               Relationship item(*it);
-               _relationships[item.getName()] = item;
-               _logger->log_info("Processor %s supported relationship name 
%s", _name.c_str(), item.getName().c_str());
-       }
-
-       return true;
-}
-
-bool Processor::setAutoTerminatedRelationships(std::set<Relationship> 
relationships)
-{
-       if (isRunning())
-       {
-               _logger->log_info("Can not set processor auto terminated 
relationship while the process %s is running",
-                               _name.c_str());
-               return false;
-       }
-
-       std::lock_guard<std::mutex> lock(_mtx);
-
-       _autoTerminatedRelationships.clear();
-       for (std::set<Relationship>::iterator it = relationships.begin(); it != 
relationships.end(); ++it)
-       {
-               Relationship item(*it);
-               _autoTerminatedRelationships[item.getName()] = item;
-               _logger->log_info("Processor %s auto terminated relationship 
name %s", _name.c_str(), item.getName().c_str());
-       }
-
-       return true;
-}
-
-bool Processor::isAutoTerminated(Relationship relationship)
-{
-       bool isRun = isRunning();
-
-       if (!isRun)
-               _mtx.lock();
-
-       std::map<std::string, Relationship>::iterator it = 
_autoTerminatedRelationships.find(relationship.getName());
-       if (it != _autoTerminatedRelationships.end())
-       {
-               if (!isRun)
-                       _mtx.unlock();
-               return true;
-       }
-       else
-       {
-               if (!isRun)
-                       _mtx.unlock();
-               return false;
-       }
-}
-
-bool Processor::isSupportedRelationship(Relationship relationship)
-{
-       bool isRun = isRunning();
-
-       if (!isRun)
-               _mtx.lock();
-
-       std::map<std::string, Relationship>::iterator it = 
_relationships.find(relationship.getName());
-       if (it != _relationships.end())
-       {
-               if (!isRun)
-                       _mtx.unlock();
-               return true;
-       }
-       else
-       {
-               if (!isRun)
-                       _mtx.unlock();
-               return false;
-       }
-}
-
-bool Processor::getProperty(std::string name, std::string &value)
-{
-       bool isRun = isRunning();
-
-       if (!isRun)
-               // Because set property only allowed in non running state, we 
need to obtain lock avoid rack condition
-               _mtx.lock();
-
-       std::map<std::string, Property>::iterator it = _properties.find(name);
-       if (it != _properties.end())
-       {
-               Property item = it->second;
-               value = item.getValue();
-               if (!isRun)
-                       _mtx.unlock();
-               return true;
-       }
-       else
-       {
-               if (!isRun)
-                       _mtx.unlock();
-               return false;
-       }
-}
-
-bool Processor::setProperty(std::string name, std::string value)
-{
-
-       std::lock_guard<std::mutex> lock(_mtx);
-       std::map<std::string, Property>::iterator it = _properties.find(name);
-
-       if (it != _properties.end())
-       {
-               Property item = it->second;
-               item.setValue(value);
-               _properties[item.getName()] = item;
-               _logger->log_info("Processor %s property name %s value %s", 
_name.c_str(), item.getName().c_str(), value.c_str());
-               return true;
-       }
-       else
-       {
-               return false;
-       }
-}
-
-std::set<Connection *> Processor::getOutGoingConnections(std::string 
relationship)
-{
-       std::set<Connection *> empty;
-
-       std::map<std::string, std::set<Connection *>>::iterator it = 
_outGoingConnections.find(relationship);
-       if (it != _outGoingConnections.end())
-       {
-               return _outGoingConnections[relationship];
-       }
-       else
-       {
-               return empty;
-       }
-}
-
-bool Processor::addConnection(Connection *connection)
-{
-       bool ret = false;
-
-       if (isRunning())
-       {
-               _logger->log_info("Can not add connection while the process %s 
is running",
-                               _name.c_str());
-               return false;
-       }
-
-       std::lock_guard<std::mutex> lock(_mtx);
-
-       uuid_t srcUUID;
-       uuid_t destUUID;
-
-       connection->getSourceProcessorUUID(srcUUID);
-       connection->getDestinationProcessorUUID(destUUID);
-
-       if (uuid_compare(_uuid, destUUID) == 0)
-       {
-               // Connection is destination to the current processor
-               if (_incomingConnections.find(connection) == 
_incomingConnections.end())
-               {
-                       _incomingConnections.insert(connection);
-                       connection->setDestinationProcessor(this);
-                       _logger->log_info("Add connection %s into Processor %s 
incoming connection",
-                                       connection->getName().c_str(), 
_name.c_str());
-                       _incomingConnectionsIter = 
this->_incomingConnections.begin();
-                       ret = true;
-               }
-       }
-
-       if (uuid_compare(_uuid, srcUUID) == 0)
-       {
-               std::string relationship = 
connection->getRelationship().getName();
-               // Connection is source from the current processor
-               std::map<std::string, std::set<Connection *>>::iterator it =
-                               _outGoingConnections.find(relationship);
-               if (it != _outGoingConnections.end())
-               {
-                       // We already has connection for this relationship
-                       std::set<Connection *> existedConnection = it->second;
-                       if (existedConnection.find(connection) == 
existedConnection.end())
-                       {
-                               // We do not have the same connection for this 
relationship yet
-                               existedConnection.insert(connection);
-                               connection->setSourceProcessor(this);
-                               _outGoingConnections[relationship] = 
existedConnection;
-                               _logger->log_info("Add connection %s into 
Processor %s outgoing connection for relationship %s",
-                                                                               
                connection->getName().c_str(), _name.c_str(), 
relationship.c_str());
-                               ret = true;
-                       }
-               }
-               else
-               {
-                       // We do not have any outgoing connection for this 
relationship yet
-                       std::set<Connection *> newConnection;
-                       newConnection.insert(connection);
-                       connection->setSourceProcessor(this);
-                       _outGoingConnections[relationship] = newConnection;
-                       _logger->log_info("Add connection %s into Processor %s 
outgoing connection for relationship %s",
-                                                               
connection->getName().c_str(), _name.c_str(), relationship.c_str());
-                       ret = true;
-               }
-       }
-
-       return ret;
-}
-
-void Processor::removeConnection(Connection *connection)
-{
-       if (isRunning())
-       {
-               _logger->log_info("Can not remove connection while the process 
%s is running",
-                               _name.c_str());
-               return;
-       }
-
-       std::lock_guard<std::mutex> lock(_mtx);
-
-       uuid_t srcUUID;
-       uuid_t destUUID;
-
-       connection->getSourceProcessorUUID(srcUUID);
-       connection->getDestinationProcessorUUID(destUUID);
-
-       if (uuid_compare(_uuid, destUUID) == 0)
-       {
-               // Connection is destination to the current processor
-               if (_incomingConnections.find(connection) != 
_incomingConnections.end())
-               {
-                       _incomingConnections.erase(connection);
-                       connection->setDestinationProcessor(NULL);
-                       _logger->log_info("Remove connection %s into Processor 
%s incoming connection",
-                                       connection->getName().c_str(), 
_name.c_str());
-                       _incomingConnectionsIter = 
this->_incomingConnections.begin();
-               }
-       }
-
-       if (uuid_compare(_uuid, srcUUID) == 0)
-       {
-               std::string relationship = 
connection->getRelationship().getName();
-               // Connection is source from the current processor
-               std::map<std::string, std::set<Connection *>>::iterator it =
-                               _outGoingConnections.find(relationship);
-               if (it == _outGoingConnections.end())
-               {
-                       return;
-               }
-               else
-               {
-                       if (_outGoingConnections[relationship].find(connection) 
!= _outGoingConnections[relationship].end())
-                       {
-                               
_outGoingConnections[relationship].erase(connection);
-                               connection->setSourceProcessor(NULL);
-                               _logger->log_info("Remove connection %s into 
Processor %s outgoing connection for relationship %s",
-                                                               
connection->getName().c_str(), _name.c_str(), relationship.c_str());
-                       }
-               }
-       }
-}
-
-Connection *Processor::getNextIncomingConnection()
-{
-       std::lock_guard<std::mutex> lock(_mtx);
-
-       if (_incomingConnections.size() == 0)
-               return NULL;
-
-       if (_incomingConnectionsIter == _incomingConnections.end())
-               _incomingConnectionsIter = _incomingConnections.begin();
-
-       Connection *ret = *_incomingConnectionsIter;
-       _incomingConnectionsIter++;
-
-       if (_incomingConnectionsIter == _incomingConnections.end())
-               _incomingConnectionsIter = _incomingConnections.begin();
-
-       return ret;
-}
-
-bool Processor::flowFilesQueued()
-{
-       std::lock_guard<std::mutex> lock(_mtx);
-
-       if (_incomingConnections.size() == 0)
-               return false;
-
-       for (std::set<Connection *>::iterator it = 
_incomingConnections.begin(); it != _incomingConnections.end(); ++it)
-       {
-               Connection *connection = *it;
-               if (connection->getQueueSize() > 0)
-                       return true;
-       }
-
-       return false;
-}
-
-bool Processor::flowFilesOutGoingFull()
-{
-       std::lock_guard<std::mutex> lock(_mtx);
-
-       std::map<std::string, std::set<Connection *>>::iterator it;
-
-       for (it = _outGoingConnections.begin(); it != 
_outGoingConnections.end(); ++it)
-       {
-               // We already has connection for this relationship
-               std::set<Connection *> existedConnection = it->second;
-               for (std::set<Connection *>::iterator itConnection = 
existedConnection.begin(); itConnection != existedConnection.end(); 
++itConnection)
-               {
-                       Connection *connection = *itConnection;
-                       if (connection->isFull())
-                               return true;
-               }
-       }
-
-       return false;
-}
-
-void Processor::onTrigger()
-{
-       ProcessContext *context = new ProcessContext(this);
-       ProcessSession *session = new ProcessSession(context);
-       try {
-               // Call the child onTrigger function
-               this->onTrigger(context, session);
-               session->commit();
-               delete session;
-               delete context;
-       }
-       catch (std::exception &exception)
-       {
-               _logger->log_debug("Caught Exception %s", exception.what());
-               session->rollback();
-               delete session;
-               delete context;
-               throw;
-       }
-       catch (...)
-       {
-               _logger->log_debug("Caught Exception Processor::onTrigger");
-               session->rollback();
-               delete session;
-               delete context;
-               throw;
-       }
-}

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b02af540/src/RealTimeDataCollector.cpp
----------------------------------------------------------------------
diff --git a/src/RealTimeDataCollector.cpp b/src/RealTimeDataCollector.cpp
deleted file mode 100644
index c7118ff..0000000
--- a/src/RealTimeDataCollector.cpp
+++ /dev/null
@@ -1,482 +0,0 @@
-/**
- * @file RealTimeDataCollector.cpp
- * RealTimeDataCollector class implementation
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#include <vector>
-#include <queue>
-#include <map>
-#include <set>
-#include <sys/time.h>
-#include <time.h>
-#include <chrono>
-#include <thread>
-#include <random>
-#include <netinet/tcp.h>
-
-#include "RealTimeDataCollector.h"
-#include "ProcessContext.h"
-#include "ProcessSession.h"
-
-const std::string 
RealTimeDataCollector::ProcessorName("RealTimeDataCollector");
-Property RealTimeDataCollector::FILENAME("File Name", "File Name for the real 
time processor to process", "data.osp");
-Property RealTimeDataCollector::REALTIMESERVERNAME("Real Time Server Name", 
"Real Time Server Name", "localhost");
-Property RealTimeDataCollector::REALTIMESERVERPORT("Real Time Server Port", 
"Real Time Server Port", "10000");
-Property RealTimeDataCollector::BATCHSERVERNAME("Batch Server Name", "Batch 
Server Name", "localhost");
-Property RealTimeDataCollector::BATCHSERVERPORT("Batch Server Port", "Batch 
Server Port", "10001");
-Property RealTimeDataCollector::ITERATION("Iteration",
-               "If true, sample osp file will be iterated", "true");
-Property RealTimeDataCollector::REALTIMEMSGID("Real Time Message ID", "Real 
Time Message ID", "41");
-Property RealTimeDataCollector::BATCHMSGID("Batch Message ID", "Batch Message 
ID", "172, 30, 48");
-Property RealTimeDataCollector::REALTIMEINTERVAL("Real Time Interval", "Real 
Time Data Collection Interval in msec", "10 ms");
-Property RealTimeDataCollector::BATCHINTERVAL("Batch Time Interval", "Batch 
Processing Interval in msec", "100 ms");
-Property RealTimeDataCollector::BATCHMAXBUFFERSIZE("Batch Max Buffer Size", 
"Batch Buffer Maximum size in bytes", "262144");
-Relationship RealTimeDataCollector::Success("success", "success operational on 
the flow record");
-
-void RealTimeDataCollector::initialize()
-{
-       //! Set the supported properties
-       std::set<Property> properties;
-       properties.insert(FILENAME);
-       properties.insert(REALTIMESERVERNAME);
-       properties.insert(REALTIMESERVERPORT);
-       properties.insert(BATCHSERVERNAME);
-       properties.insert(BATCHSERVERPORT);
-       properties.insert(ITERATION);
-       properties.insert(REALTIMEMSGID);
-       properties.insert(BATCHMSGID);
-       properties.insert(REALTIMEINTERVAL);
-       properties.insert(BATCHINTERVAL);
-       properties.insert(BATCHMAXBUFFERSIZE);
-
-       setSupportedProperties(properties);
-       //! Set the supported relationships
-       std::set<Relationship> relationships;
-       relationships.insert(Success);
-       setSupportedRelationships(relationships);
-
-}
-
-int RealTimeDataCollector::connectServer(const char *host, uint16_t port)
-{
-       in_addr_t addr;
-       int sock = 0;
-       struct hostent *h;
-#ifdef __MACH__
-       h = gethostbyname(host);
-#else
-       char buf[1024];
-       struct hostent he;
-       int hh_errno;
-       gethostbyname_r(host, &he, buf, sizeof(buf), &h, &hh_errno);
-#endif
-       memcpy((char *) &addr, h->h_addr_list[0], h->h_length);
-       sock = socket(AF_INET, SOCK_STREAM, 0);
-       if (sock < 0)
-       {
-               _logger->log_error("Could not create socket to hostName %s", 
host);
-               return 0;
-       }
-
-#ifndef __MACH__
-       int opt = 1;
-       bool nagle_off = true;
-
-       if (nagle_off)
-       {
-               if (setsockopt(sock, SOL_TCP, TCP_NODELAY, (void *)&opt, 
sizeof(opt)) < 0)
-               {
-                       _logger->log_error("setsockopt() TCP_NODELAY failed");
-                       close(sock);
-                       return 0;
-               }
-               if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR,
-                               (char *)&opt, sizeof(opt)) < 0)
-               {
-                       _logger->log_error("setsockopt() SO_REUSEADDR failed");
-                       close(sock);
-                       return 0;
-               }
-       }
-
-       int sndsize = 256*1024;
-       if (setsockopt(sock, SOL_SOCKET, SO_SNDBUF, (char *)&sndsize, 
(int)sizeof(sndsize)) < 0)
-       {
-               _logger->log_error("setsockopt() SO_SNDBUF failed");
-               close(sock);
-               return 0;
-       }
-#endif
-
-       struct sockaddr_in sa;
-       socklen_t socklen;
-       int status;
-
-       //TODO bind socket to the interface
-       memset(&sa, 0, sizeof(sa));
-       sa.sin_family = AF_INET;
-       sa.sin_addr.s_addr = htonl(INADDR_ANY);
-       sa.sin_port = htons(0);
-       socklen = sizeof(sa);
-       if (bind(sock, (struct sockaddr *)&sa, socklen) < 0)
-       {
-               _logger->log_error("socket bind failed");
-               close(sock);
-               return 0;
-       }
-
-       memset(&sa, 0, sizeof(sa));
-       sa.sin_family = AF_INET;
-       sa.sin_addr.s_addr = addr;
-       sa.sin_port = htons(port);
-       socklen = sizeof(sa);
-
-       status = connect(sock, (struct sockaddr *)&sa, socklen);
-
-       if (status < 0)
-       {
-               _logger->log_error("socket connect failed to %s %d", host, 
port);
-               close(sock);
-               return 0;
-       }
-
-       _logger->log_info("socket %d connect to server %s port %d success", 
sock, host, port);
-
-       return sock;
-}
-
-int RealTimeDataCollector::sendData(int socket, const char *buf, int buflen)
-{
-       int ret = 0, bytes = 0;
-
-       while (bytes < buflen)
-       {
-               ret = send(socket, buf+bytes, buflen-bytes, 0);
-               //check for errors
-               if (ret == -1)
-               {
-                       return ret;
-               }
-               bytes+=ret;
-       }
-
-       if (ret)
-               _logger->log_debug("Send data size %d over socket %d", buflen, 
socket);
-
-       return ret;
-}
-
-void RealTimeDataCollector::onTriggerRealTime(ProcessContext *context, 
ProcessSession *session)
-{
-       if (_realTimeAccumulated >= this->_realTimeInterval)
-       {
-               std::string value;
-               if (this->getProperty(REALTIMEMSGID.getName(), value))
-               {
-                       this->_realTimeMsgID.clear();
-                       this->_logger->log_info("Real Time Msg IDs %s", 
value.c_str());
-                       std::stringstream lineStream(value);
-                       std::string cell;
-
-                       while(std::getline(lineStream, cell, ','))
-                   {
-                       this->_realTimeMsgID.push_back(cell);
-                       // this->_logger->log_debug("Real Time Msg ID %s", 
cell.c_str());
-                   }
-               }
-               if (this->getProperty(BATCHMSGID.getName(), value))
-               {
-                       this->_batchMsgID.clear();
-                       this->_logger->log_info("Batch Msg IDs %s", 
value.c_str());
-                       std::stringstream lineStream(value);
-                       std::string cell;
-
-                       while(std::getline(lineStream, cell, ','))
-                   {
-                               cell = Property::trim(cell);
-                       this->_batchMsgID.push_back(cell);
-                       // this->_logger->log_debug("Batch Msg ID %s", 
cell.c_str());
-                   }
-               }
-               // _logger->log_info("onTriggerRealTime");
-               // Open the file
-               if (!this->_fileStream.is_open())
-               {
-                       _fileStream.open(this->_fileName.c_str(), 
std::ifstream::in);
-                       if (this->_fileStream.is_open())
-                               _logger->log_debug("open %s", 
_fileName.c_str());
-               }
-               if (!_fileStream.good())
-               {
-                       _logger->log_error("load data file failed %s", 
_fileName.c_str());
-                       return;
-               }
-               if (this->_fileStream.is_open())
-               {
-                       std::string line;
-
-                       while (std::getline(_fileStream, line))
-                       {
-                               line += "\n";
-                               std::stringstream lineStream(line);
-                               std::string cell;
-                               if (std::getline(lineStream, cell, ','))
-                               {
-                                       cell = Property::trim(cell);
-                                       // Check whether it match to the batch 
traffic
-                                       for (std::vector<std::string>::iterator 
it = _batchMsgID.begin(); it != _batchMsgID.end(); ++it)
-                                       {
-                                               if (cell == *it)
-                                               {
-                                                       // push the batch data 
to the queue
-                                                       
std::lock_guard<std::mutex> lock(_mtx);
-                                                       while ((_queuedDataSize 
+ line.size()) > _batchMaxBufferSize)
-                                                       {
-                                                               std::string 
item = _queue.front();
-                                                               _queuedDataSize 
-= item.size();
-                                                               
_logger->log_debug("Pop item size %d from batch queue, queue buffer size %d", 
item.size(), _queuedDataSize);
-                                                               _queue.pop();
-                                                       }
-                                                       _queue.push(line);
-                                                       _queuedDataSize += 
line.size();
-                                                       
_logger->log_debug("Push batch msg ID %s into batch queue, queue buffer size 
%d", cell.c_str(), _queuedDataSize);
-                                               }
-                                       }
-                                       bool findRealTime = false;
-                                       // Check whether it match to the real 
time traffic
-                                       for (std::vector<std::string>::iterator 
it = _realTimeMsgID.begin(); it != _realTimeMsgID.end(); ++it)
-                                       {
-                                               if (cell == *it)
-                                               {
-                                                       int status = 0;
-                                                       if 
(this->_realTimeSocket <= 0)
-                                                       {
-                                                               // Connect the 
LTE socket
-                                                               uint16_t port = 
_realTimeServerPort;
-                                                               
this->_realTimeSocket = connectServer(_realTimeServerName.c_str(), port);
-                                                       }
-                                                       if 
(this->_realTimeSocket)
-                                                       {
-                                                               // try to send 
the data
-                                                               status = 
sendData(_realTimeSocket, line.data(), line.size());
-                                                               if (status < 0)
-                                                               {
-                                                                       
close(_realTimeSocket);
-                                                                       
_realTimeSocket = 0;
-                                                               }
-                                                       }
-                                                       if 
(this->_realTimeSocket <= 0 || status < 0)
-                                                       {
-                                                               // push the 
batch data to the queue
-                                                               
std::lock_guard<std::mutex> lock(_mtx);
-                                                               while 
((_queuedDataSize + line.size()) > _batchMaxBufferSize)
-                                                               {
-                                                                       
std::string item = _queue.front();
-                                                                       
_queuedDataSize -= item.size();
-                                                                       
_logger->log_debug("Pop item size %d from batch queue, queue buffer size %d", 
item.size(), _queuedDataSize);
-                                                                       
_queue.pop();
-                                                               }
-                                                               
_queue.push(line);
-                                                               _queuedDataSize 
+= line.size();
-                                                               
_logger->log_debug("Push real time msg ID %s into batch queue, queue buffer 
size %d", cell.c_str(), _queuedDataSize);
-                                                       }
-                                                       // find real time
-                                                       findRealTime = true;
-                                               } // cell
-                                       } // for real time pattern
-                                       if (findRealTime)
-                                               // we break the while once we 
find the first real time
-                                               break;
-                               }  // if get line
-                       } // while
-                       if (_fileStream.eof())
-                       {
-                               _fileStream.close();
-                       }
-               } // if open
-               _realTimeAccumulated = 0;
-       }
-       _realTimeAccumulated += 
context->getProcessor()->getSchedulingPeriodNano();
-}
-
-void RealTimeDataCollector::onTriggerBatch(ProcessContext *context, 
ProcessSession *session)
-{
-       if (_batchAcccumulated >= this->_batchInterval)
-       {
-               // _logger->log_info("onTriggerBatch");
-               // dequeue the batch and send over WIFI
-               int status = 0;
-               if (this->_batchSocket <= 0)
-               {
-                       // Connect the WIFI socket
-                       uint16_t port = _batchServerPort;
-                       this->_batchSocket = 
connectServer(_batchServerName.c_str(), port);
-               }
-               if (this->_batchSocket)
-               {
-                       std::lock_guard<std::mutex> lock(_mtx);
-
-                       while (!_queue.empty())
-                       {
-                               std::string line = _queue.front();
-                               status = sendData(_batchSocket, line.data(), 
line.size());
-                               _queue.pop();
-                               _queuedDataSize -= line.size();
-                               if (status < 0)
-                               {
-                                       close(_batchSocket);
-                                       _batchSocket = 0;
-                                       break;
-                               }
-                       }
-               }
-               _batchAcccumulated = 0;
-       }
-       _batchAcccumulated += 
context->getProcessor()->getSchedulingPeriodNano();
-}
-
-void RealTimeDataCollector::onTrigger(ProcessContext *context, ProcessSession 
*session)
-{
-       std::thread::id id = std::this_thread::get_id();
-
-       if (id == _realTimeThreadId)
-               return onTriggerRealTime(context, session);
-       else if (id == _batchThreadId)
-               return onTriggerBatch(context, session);
-       else
-       {
-               std::lock_guard<std::mutex> lock(_mtx);
-               if (!this->_firstInvoking)
-               {
-                       this->_fileName = "data.osp";
-                       std::string value;
-                       if (this->getProperty(FILENAME.getName(), value))
-                       {
-                               this->_fileName = value;
-                               this->_logger->log_info("Data Collector File 
Name %s", _fileName.c_str());
-                       }
-                       this->_realTimeServerName = "localhost";
-                       if (this->getProperty(REALTIMESERVERNAME.getName(), 
value))
-                       {
-                               this->_realTimeServerName = value;
-                               this->_logger->log_info("Real Time Server Name 
%s", this->_realTimeServerName.c_str());
-                       }
-                       this->_realTimeServerPort = 10000;
-                       if (this->getProperty(REALTIMESERVERPORT.getName(), 
value))
-                       {
-                               Property::StringToInt(value, 
_realTimeServerPort);
-                               this->_logger->log_info("Real Time Server Port 
%d", _realTimeServerPort);
-                       }
-                       if (this->getProperty(BATCHSERVERNAME.getName(), value))
-                       {
-                               this->_batchServerName = value;
-                               this->_logger->log_info("Batch Server Name %s", 
this->_batchServerName.c_str());
-                       }
-                       this->_batchServerPort = 10001;
-                       if (this->getProperty(BATCHSERVERPORT.getName(), value))
-                       {
-                               Property::StringToInt(value, _batchServerPort);
-                               this->_logger->log_info("Batch Server Port %d", 
_batchServerPort);
-                       }
-                       if (this->getProperty(ITERATION.getName(), value))
-                       {
-                               Property::StringToBool(value, this->_iteration);
-                               _logger->log_info("Iteration %d", _iteration);
-                       }
-                       this->_realTimeInterval = 10000000; //10 msec
-                       if (this->getProperty(REALTIMEINTERVAL.getName(), 
value))
-                       {
-                               TimeUnit unit;
-                               if (Property::StringToTime(value, 
_realTimeInterval, unit) &&
-                                                               
Property::ConvertTimeUnitToNS(_realTimeInterval, unit, _realTimeInterval))
-                               {
-                                       _logger->log_info("Real Time Interval: 
[%d] ns", _realTimeInterval);
-                               }
-                       }
-                       this->_batchInterval = 100000000; //100 msec
-                       if (this->getProperty(BATCHINTERVAL.getName(), value))
-                       {
-                               TimeUnit unit;
-                               if (Property::StringToTime(value, 
_batchInterval, unit) &&
-                                                               
Property::ConvertTimeUnitToNS(_batchInterval, unit, _batchInterval))
-                               {
-                                       _logger->log_info("Batch Time Interval: 
[%d] ns", _batchInterval);
-                               }
-                       }
-                       this->_batchMaxBufferSize = 256*1024;
-                       if (this->getProperty(BATCHMAXBUFFERSIZE.getName(), 
value))
-                       {
-                               Property::StringToInt(value, 
_batchMaxBufferSize);
-                               this->_logger->log_info("Batch Max Buffer Size 
%d", _batchMaxBufferSize);
-                       }
-                       if (this->getProperty(REALTIMEMSGID.getName(), value))
-                       {
-                               this->_logger->log_info("Real Time Msg IDs %s", 
value.c_str());
-                               std::stringstream lineStream(value);
-                               std::string cell;
-
-                               while(std::getline(lineStream, cell, ','))
-                           {
-                               this->_realTimeMsgID.push_back(cell);
-                               this->_logger->log_info("Real Time Msg ID %s", 
cell.c_str());
-                           }
-                       }
-                       if (this->getProperty(BATCHMSGID.getName(), value))
-                       {
-                               this->_logger->log_info("Batch Msg IDs %s", 
value.c_str());
-                               std::stringstream lineStream(value);
-                               std::string cell;
-
-                               while(std::getline(lineStream, cell, ','))
-                           {
-                                       cell = Property::trim(cell);
-                               this->_batchMsgID.push_back(cell);
-                               this->_logger->log_info("Batch Msg ID %s", 
cell.c_str());
-                           }
-                       }
-                       // Connect the LTE socket
-                       uint16_t port = _realTimeServerPort;
-
-                       this->_realTimeSocket = 
connectServer(_realTimeServerName.c_str(), port);
-
-                       // Connect the WIFI socket
-                       port = _batchServerPort;
-
-                       this->_batchSocket = 
connectServer(_batchServerName.c_str(), port);
-
-                       // Open the file
-                       _fileStream.open(this->_fileName.c_str(), 
std::ifstream::in);
-                       if (!_fileStream.good())
-                       {
-                               _logger->log_error("load data file failed %s", 
_fileName.c_str());
-                               return;
-                       }
-                       else
-                       {
-                               _logger->log_debug("open %s", 
_fileName.c_str());
-                       }
-                       _realTimeThreadId = id;
-                       this->_firstInvoking = true;
-               }
-               else
-               {
-                       if (id != _realTimeThreadId)
-                               _batchThreadId = id;
-                       this->_firstInvoking = false;
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b02af540/src/RemoteProcessorGroupPort.cpp
----------------------------------------------------------------------
diff --git a/src/RemoteProcessorGroupPort.cpp b/src/RemoteProcessorGroupPort.cpp
deleted file mode 100644
index 9d849ae..0000000
--- a/src/RemoteProcessorGroupPort.cpp
+++ /dev/null
@@ -1,100 +0,0 @@
-/**
- * @file RemoteProcessorGroupPort.cpp
- * RemoteProcessorGroupPort class implementation
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#include <vector>
-#include <queue>
-#include <map>
-#include <set>
-#include <sys/time.h>
-#include <time.h>
-#include <sstream>
-#include <string.h>
-#include <iostream>
-
-#include "TimeUtil.h"
-#include "RemoteProcessorGroupPort.h"
-#include "ProcessContext.h"
-#include "ProcessSession.h"
-
-const std::string 
RemoteProcessorGroupPort::ProcessorName("RemoteProcessorGroupPort");
-Property RemoteProcessorGroupPort::hostName("Host Name", "Remote Host Name.", 
"localhost");
-Property RemoteProcessorGroupPort::port("Port", "Remote Port", "9999");
-Relationship RemoteProcessorGroupPort::relation;
-
-void RemoteProcessorGroupPort::initialize()
-{
-       //! Set the supported properties
-       std::set<Property> properties;
-       properties.insert(hostName);
-       properties.insert(port);
-       setSupportedProperties(properties);
-       //! Set the supported relationships
-       std::set<Relationship> relationships;
-       relationships.insert(relation);
-       setSupportedRelationships(relationships);
-}
-
-void RemoteProcessorGroupPort::onTrigger(ProcessContext *context, 
ProcessSession *session)
-{
-       std::string value;
-
-       if (!_transmitting)
-               return;
-
-       std::string host = _peer->getHostName();
-       uint16_t sport = _peer->getPort();
-       int64_t lvalue;
-       bool needReset = false;
-
-       if (context->getProperty(hostName.getName(), value))
-       {
-               host = value;
-       }
-       if (context->getProperty(port.getName(), value) && 
Property::StringToInt(value, lvalue))
-       {
-               sport = (uint16_t) lvalue;
-       }
-       if (host != _peer->getHostName())
-       {
-               _peer->setHostName(host);
-               needReset= true;
-       }
-       if (sport != _peer->getPort())
-       {
-               _peer->setPort(sport);
-               needReset = true;
-       }
-       if (needReset)
-               _protocol->tearDown();
-
-       if (!_protocol->bootstrap())
-       {
-               // bootstrap the client protocol if needeed
-               context->yield();
-               _logger->log_error("Site2Site bootstrap failed yield period %d 
peer timeout %d", context->getProcessor()->getYieldPeriodMsec(), 
_protocol->getPeer()->getTimeOut());
-               return;
-       }
-
-       if (_direction == RECEIVE)
-               _protocol->receiveFlowFiles(context, session);
-       else
-               _protocol->transferFlowFiles(context, session);
-
-       return;
-}

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b02af540/src/ResourceClaim.cpp
----------------------------------------------------------------------
diff --git a/src/ResourceClaim.cpp b/src/ResourceClaim.cpp
deleted file mode 100644
index 3c22ac9..0000000
--- a/src/ResourceClaim.cpp
+++ /dev/null
@@ -1,45 +0,0 @@
-/**
- * @file ResourceClaim.cpp
- * ResourceClaim class implementation
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#include <vector>
-#include <queue>
-#include <map>
-
-#include "ResourceClaim.h"
-
-std::atomic<uint64_t> ResourceClaim::_localResourceClaimNumber(0);
-
-ResourceClaim::ResourceClaim(const std::string contentDirectory)
-: _id(_localResourceClaimNumber.load()),
-  _flowFileRecordOwnedCount(0)
-{
-       char uuidStr[37];
-
-       // Generate the global UUID for the resource claim
-       uuid_generate(_uuid);
-       // Increase the local ID for the resource claim
-       ++_localResourceClaimNumber;
-       uuid_unparse(_uuid, uuidStr);
-       // Create the full content path for the content
-       _contentFullPath = contentDirectory + "/" + uuidStr;
-
-       _configure = Configure::getConfigure();
-       _logger = Logger::getLogger();
-       _logger->log_debug("Resource Claim created %s", 
_contentFullPath.c_str());
-}

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b02af540/src/SchedulingAgent.cpp
----------------------------------------------------------------------
diff --git a/src/SchedulingAgent.cpp b/src/SchedulingAgent.cpp
deleted file mode 100644
index 211c328..0000000
--- a/src/SchedulingAgent.cpp
+++ /dev/null
@@ -1,86 +0,0 @@
-/**
- * @file SchedulingAgent.cpp
- * SchedulingAgent class implementation
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#include <chrono>
-#include <thread>
-#include <iostream>
-#include "Exception.h"
-#include "SchedulingAgent.h"
-
-bool SchedulingAgent::hasWorkToDo(Processor *processor)
-{
-       // Whether it has work to do
-       if (processor->getTriggerWhenEmpty() || 
!processor->hasIncomingConnections() ||
-                       processor->flowFilesQueued())
-               return true;
-       else
-               return false;
-}
-
-bool SchedulingAgent::hasTooMuchOutGoing(Processor *processor)
-{
-       return processor->flowFilesOutGoingFull();
-}
-
-bool SchedulingAgent::onTrigger(Processor *processor)
-{
-       if (processor->isYield())
-               return false;
-
-       // No need to yield, reset yield expiration to 0
-       processor->clearYield();
-
-       if (!hasWorkToDo(processor))
-               // No work to do, yield
-               return true;
-
-       if(hasTooMuchOutGoing(processor))
-               // need to apply backpressure
-               return true;
-
-       //TODO runDuration
-
-       processor->incrementActiveTasks();
-       try
-       {
-               processor->onTrigger();
-               processor->decrementActiveTask();
-       }
-       catch (Exception &exception)
-       {
-               // Normal exception
-               _logger->log_debug("Caught Exception %s", exception.what());
-               processor->decrementActiveTask();
-       }
-       catch (std::exception &exception)
-       {
-               _logger->log_debug("Caught Exception %s", exception.what());
-               processor->yield(_administrativeYieldDuration);
-               processor->decrementActiveTask();
-       }
-       catch (...)
-       {
-               _logger->log_debug("Caught Exception during 
SchedulingAgent::onTrigger");
-               processor->yield(_administrativeYieldDuration);
-               processor->decrementActiveTask();
-       }
-
-       return false;
-}
-

Reply via email to