http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b02af540/src/LogAttribute.cpp ---------------------------------------------------------------------- diff --git a/src/LogAttribute.cpp b/src/LogAttribute.cpp deleted file mode 100644 index 82130f8..0000000 --- a/src/LogAttribute.cpp +++ /dev/null @@ -1,158 +0,0 @@ -/** - * @file LogAttribute.cpp - * LogAttribute class implementation - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#include <vector> -#include <queue> -#include <map> -#include <set> -#include <sys/time.h> -#include <time.h> -#include <sstream> -#include <string.h> -#include <iostream> - -#include "TimeUtil.h" -#include "LogAttribute.h" -#include "ProcessContext.h" -#include "ProcessSession.h" - -const std::string LogAttribute::ProcessorName("LogAttribute"); -Property LogAttribute::LogLevel("Log Level", "The Log Level to use when logging the Attributes", "info"); -Property LogAttribute::AttributesToLog("Attributes to Log", "A comma-separated list of Attributes to Log. If not specified, all attributes will be logged.", ""); -Property LogAttribute::AttributesToIgnore("Attributes to Ignore", "A comma-separated list of Attributes to ignore. If not specified, no attributes will be ignored.", ""); -Property LogAttribute::LogPayload("Log Payload", - "If true, the FlowFile's payload will be logged, in addition to its attributes; otherwise, just the Attributes will be logged.", "false"); -Property LogAttribute::LogPrefix("Log prefix", - "Log prefix appended to the log lines. It helps to distinguish the output of multiple LogAttribute processors.", ""); -Relationship LogAttribute::Success("success", "success operational on the flow record"); - -void LogAttribute::initialize() -{ - //! Set the supported properties - std::set<Property> properties; - properties.insert(LogLevel); - properties.insert(AttributesToLog); - properties.insert(AttributesToIgnore); - properties.insert(LogPayload); - properties.insert(LogPrefix); - setSupportedProperties(properties); - //! Set the supported relationships - std::set<Relationship> relationships; - relationships.insert(Success); - setSupportedRelationships(relationships); -} - -void LogAttribute::onTrigger(ProcessContext *context, ProcessSession *session) -{ - std::string dashLine = "--------------------------------------------------"; - LogAttrLevel level = LogAttrLevelInfo; - bool logPayload = false; - std::ostringstream message; - - FlowFileRecord *flow = session->get(); - - if (!flow) - return; - - std::string value; - if (context->getProperty(LogLevel.getName(), value)) - { - logLevelStringToEnum(value, level); - } - if (context->getProperty(LogPrefix.getName(), value)) - { - dashLine = "-----" + value + "-----"; - } - if (context->getProperty(LogPayload.getName(), value)) - { - Property::StringToBool(value, logPayload); - } - - message << "Logging for flow file " << "\n"; - message << dashLine; - message << "\nStandard FlowFile Attributes"; - message << "\n" << "UUID:" << flow->getUUIDStr(); - message << "\n" << "EntryDate:" << getTimeStr(flow->getEntryDate()); - message << "\n" << "lineageStartDate:" << getTimeStr(flow->getlineageStartDate()); - message << "\n" << "Size:" << flow->getSize() << " Offset:" << flow->getOffset(); - message << "\nFlowFile Attributes Map Content"; - std::map<std::string, std::string> attrs = flow->getAttributes(); - std::map<std::string, std::string>::iterator it; - for (it = attrs.begin(); it!= attrs.end(); it++) - { - message << "\n" << "key:" << it->first << " value:" << it->second; - } - message << "\nFlowFile Resource Claim Content"; - ResourceClaim *claim = flow->getResourceClaim(); - if (claim) - { - message << "\n" << "Content Claim:" << claim->getContentFullPath(); - } - if (logPayload && flow->getSize() <= 1024*1024) - { - message << "\n" << "Payload:" << "\n"; - ReadCallback callback(flow->getSize()); - session->read(flow, &callback); - for (unsigned int i = 0, j = 0; i < callback._readSize; i++) - { - char temp[8]; - sprintf(temp, "%02x ", (unsigned char) (callback._buffer[i])); - message << temp; - j++; - if (j == 16) - { - message << '\n'; - j = 0; - } - } - } - message << "\n" << dashLine << std::ends; - std::string output = message.str(); - - switch (level) - { - case LogAttrLevelInfo: - _logger->log_info("%s", output.c_str()); - break; - case LogAttrLevelDebug: - _logger->log_debug("%s", output.c_str()); - break; - case LogAttrLevelError: - _logger->log_error("%s", output.c_str()); - break; - case LogAttrLevelTrace: - _logger->log_trace("%s", output.c_str()); - break; - case LogAttrLevelWarn: - _logger->log_warn("%s", output.c_str()); - break; - default: - break; - } - - // Test Import - /* - FlowFileRecord *importRecord = session->create(); - session->import(claim->getContentFullPath(), importRecord); - session->transfer(importRecord, Success); */ - - - // Transfer to the relationship - session->transfer(flow, Success); -}
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b02af540/src/Logger.cpp ---------------------------------------------------------------------- diff --git a/src/Logger.cpp b/src/Logger.cpp deleted file mode 100644 index 984f609..0000000 --- a/src/Logger.cpp +++ /dev/null @@ -1,27 +0,0 @@ -/** - * @file Logger.cpp - * Logger class implementation - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#include <vector> -#include <queue> -#include <map> - -#include "Logger.h" - -Logger *Logger::_logger(NULL); - http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b02af540/src/ProcessGroup.cpp ---------------------------------------------------------------------- diff --git a/src/ProcessGroup.cpp b/src/ProcessGroup.cpp deleted file mode 100644 index 70ee9d7..0000000 --- a/src/ProcessGroup.cpp +++ /dev/null @@ -1,314 +0,0 @@ -/** - * @file ProcessGroup.cpp - * ProcessGroup class implementation - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#include <vector> -#include <queue> -#include <map> -#include <set> -#include <sys/time.h> -#include <time.h> -#include <chrono> -#include <thread> - -#include "ProcessGroup.h" -#include "Processor.h" - -ProcessGroup::ProcessGroup(ProcessGroupType type, std::string name, uuid_t uuid, ProcessGroup *parent) -: _name(name), - _type(type), - _parentProcessGroup(parent) -{ - if (!uuid) - // Generate the global UUID for the flow record - uuid_generate(_uuid); - else - uuid_copy(_uuid, uuid); - - _yieldPeriodMsec = 0; - _transmitting = false; - - _logger = Logger::getLogger(); - _logger->log_info("ProcessGroup %s created", _name.c_str()); -} - -ProcessGroup::~ProcessGroup() -{ - for (std::set<Connection *>::iterator it = _connections.begin(); it != _connections.end(); ++it) - { - Connection *connection = *it; - connection->drain(); - delete connection; - } - - for (std::set<ProcessGroup *>::iterator it = _childProcessGroups.begin(); it != _childProcessGroups.end(); ++it) - { - ProcessGroup *processGroup(*it); - delete processGroup; - } - - for (std::set<Processor *>::iterator it = _processors.begin(); it != _processors.end(); ++it) - { - Processor *processor(*it); - delete processor; - } -} - -bool ProcessGroup::isRootProcessGroup() -{ - std::lock_guard<std::mutex> lock(_mtx); - return (_type == ROOT_PROCESS_GROUP); -} - -void ProcessGroup::addProcessor(Processor *processor) -{ - std::lock_guard<std::mutex> lock(_mtx); - - if (_processors.find(processor) == _processors.end()) - { - // We do not have the same processor in this process group yet - _processors.insert(processor); - _logger->log_info("Add processor %s into process group %s", - processor->getName().c_str(), _name.c_str()); - } -} - -void ProcessGroup::removeProcessor(Processor *processor) -{ - std::lock_guard<std::mutex> lock(_mtx); - - if (_processors.find(processor) != _processors.end()) - { - // We do have the same processor in this process group yet - _processors.erase(processor); - _logger->log_info("Remove processor %s from process group %s", - processor->getName().c_str(), _name.c_str()); - } -} - -void ProcessGroup::addProcessGroup(ProcessGroup *child) -{ - std::lock_guard<std::mutex> lock(_mtx); - - if (_childProcessGroups.find(child) == _childProcessGroups.end()) - { - // We do not have the same child process group in this process group yet - _childProcessGroups.insert(child); - _logger->log_info("Add child process group %s into process group %s", - child->getName().c_str(), _name.c_str()); - } -} - -void ProcessGroup::removeProcessGroup(ProcessGroup *child) -{ - std::lock_guard<std::mutex> lock(_mtx); - - if (_childProcessGroups.find(child) != _childProcessGroups.end()) - { - // We do have the same child process group in this process group yet - _childProcessGroups.erase(child); - _logger->log_info("Remove child process group %s from process group %s", - child->getName().c_str(), _name.c_str()); - } -} - -void ProcessGroup::startProcessing(TimerDrivenSchedulingAgent *timeScheduler) -{ - std::lock_guard<std::mutex> lock(_mtx); - - try - { - // Start all the processor node, input and output ports - for (std::set<Processor *>::iterator it = _processors.begin(); it != _processors.end(); ++it) - { - Processor *processor(*it); - if (!processor->isRunning() && processor->getScheduledState() != DISABLED) - { - if (processor->getSchedulingStrategy() == TIMER_DRIVEN) - timeScheduler->schedule(processor); - } - } - - for (std::set<ProcessGroup *>::iterator it = _childProcessGroups.begin(); it != _childProcessGroups.end(); ++it) - { - ProcessGroup *processGroup(*it); - processGroup->startProcessing(timeScheduler); - } - } - catch (std::exception &exception) - { - _logger->log_debug("Caught Exception %s", exception.what()); - throw; - } - catch (...) - { - _logger->log_debug("Caught Exception during process group start processing"); - throw; - } -} - -void ProcessGroup::stopProcessing(TimerDrivenSchedulingAgent *timeScheduler) -{ - std::lock_guard<std::mutex> lock(_mtx); - - try - { - // Stop all the processor node, input and output ports - for (std::set<Processor *>::iterator it = _processors.begin(); it != _processors.end(); ++it) - { - Processor *processor(*it); - if (processor->getSchedulingStrategy() == TIMER_DRIVEN) - timeScheduler->unschedule(processor); - } - - for (std::set<ProcessGroup *>::iterator it = _childProcessGroups.begin(); it != _childProcessGroups.end(); ++it) - { - ProcessGroup *processGroup(*it); - processGroup->stopProcessing(timeScheduler); - } - } - catch (std::exception &exception) - { - _logger->log_debug("Caught Exception %s", exception.what()); - throw; - } - catch (...) - { - _logger->log_debug("Caught Exception during process group stop processing"); - throw; - } -} - -Processor *ProcessGroup::findProcessor(uuid_t uuid) -{ - Processor *ret = NULL; - // std::lock_guard<std::mutex> lock(_mtx); - - for (std::set<Processor *>::iterator it = _processors.begin(); it != _processors.end(); ++it) - { - Processor *processor(*it); - uuid_t processorUUID; - if (processor->getUUID(processorUUID) && uuid_compare(processorUUID, uuid) == 0) - return processor; - } - - for (std::set<ProcessGroup *>::iterator it = _childProcessGroups.begin(); it != _childProcessGroups.end(); ++it) - { - ProcessGroup *processGroup(*it); - Processor *processor = processGroup->findProcessor(uuid); - if (processor) - return processor; - } - - return ret; -} - -Processor *ProcessGroup::findProcessor(std::string processorName) -{ - Processor *ret = NULL; - - for (std::set<Processor *>::iterator it = _processors.begin(); it != _processors.end(); ++it) - { - Processor *processor(*it); - _logger->log_debug("Current processor is %s", processor->getName().c_str()); - if (processor->getName() == processorName) - return processor; - } - - for (std::set<ProcessGroup *>::iterator it = _childProcessGroups.begin(); it != _childProcessGroups.end(); ++it) - { - ProcessGroup *processGroup(*it); - Processor *processor = processGroup->findProcessor(processorName); - if (processor) - return processor; - } - - return ret; -} - -void ProcessGroup::updatePropertyValue(std::string processorName, std::string propertyName, std::string propertyValue) -{ - std::lock_guard<std::mutex> lock(_mtx); - - for (std::set<Processor *>::iterator it = _processors.begin(); it != _processors.end(); ++it) - { - Processor *processor(*it); - if (processor->getName() == processorName) - { - processor->setProperty(propertyName, propertyValue); - } - } - - for (std::set<ProcessGroup *>::iterator it = _childProcessGroups.begin(); it != _childProcessGroups.end(); ++it) - { - ProcessGroup *processGroup(*it); - processGroup->updatePropertyValue(processorName, propertyName, propertyValue); - } - - return; -} - -void ProcessGroup::addConnection(Connection *connection) -{ - std::lock_guard<std::mutex> lock(_mtx); - - if (_connections.find(connection) == _connections.end()) - { - // We do not have the same connection in this process group yet - _connections.insert(connection); - _logger->log_info("Add connection %s into process group %s", - connection->getName().c_str(), _name.c_str()); - uuid_t sourceUUID; - Processor *source = NULL; - connection->getSourceProcessorUUID(sourceUUID); - source = this->findProcessor(sourceUUID); - if (source) - source->addConnection(connection); - Processor *destination = NULL; - uuid_t destinationUUID; - connection->getDestinationProcessorUUID(destinationUUID); - destination = this->findProcessor(destinationUUID); - if (destination && destination != source) - destination->addConnection(connection); - } -} - -void ProcessGroup::removeConnection(Connection *connection) -{ - std::lock_guard<std::mutex> lock(_mtx); - - if (_connections.find(connection) != _connections.end()) - { - // We do not have the same connection in this process group yet - _connections.erase(connection); - _logger->log_info("Remove connection %s into process group %s", - connection->getName().c_str(), _name.c_str()); - uuid_t sourceUUID; - Processor *source = NULL; - connection->getSourceProcessorUUID(sourceUUID); - source = this->findProcessor(sourceUUID); - if (source) - source->removeConnection(connection); - Processor *destination = NULL; - uuid_t destinationUUID; - connection->getDestinationProcessorUUID(destinationUUID); - destination = this->findProcessor(destinationUUID); - if (destination && destination != source) - destination->removeConnection(connection); - } -} http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b02af540/src/ProcessSession.cpp ---------------------------------------------------------------------- diff --git a/src/ProcessSession.cpp b/src/ProcessSession.cpp deleted file mode 100644 index 4f526c3..0000000 --- a/src/ProcessSession.cpp +++ /dev/null @@ -1,731 +0,0 @@ -/** - * @file ProcessSession.cpp - * ProcessSession class implementation - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#include <vector> -#include <queue> -#include <map> -#include <set> -#include <sys/time.h> -#include <time.h> -#include <chrono> -#include <thread> -#include <iostream> - -#include "ProcessSession.h" - -FlowFileRecord* ProcessSession::create() -{ - std::map<std::string, std::string> empty; - FlowFileRecord *record = new FlowFileRecord(empty); - - if (record) - { - _addedFlowFiles[record->getUUIDStr()] = record; - _logger->log_debug("Create FlowFile with UUID %s", record->getUUIDStr().c_str()); - } - - return record; -} - -FlowFileRecord* ProcessSession::create(FlowFileRecord *parent) -{ - FlowFileRecord *record = this->create(); - if (record) - { - // Copy attributes - std::map<std::string, std::string> parentAttributes = parent->getAttributes(); - std::map<std::string, std::string>::iterator it; - for (it = parentAttributes.begin(); it!= parentAttributes.end(); it++) - { - if (it->first == FlowAttributeKey(ALTERNATE_IDENTIFIER) || - it->first == FlowAttributeKey(DISCARD_REASON) || - it->first == FlowAttributeKey(UUID)) - // Do not copy special attributes from parent - continue; - record->setAttribute(it->first, it->second); - } - record->_lineageStartDate = parent->_lineageStartDate; - record->_lineageIdentifiers = parent->_lineageIdentifiers; - record->_lineageIdentifiers.insert(parent->_uuidStr); - - } - return record; -} - -FlowFileRecord* ProcessSession::clone(FlowFileRecord *parent) -{ - FlowFileRecord *record = this->create(parent); - if (record) - { - // Copy Resource Claim - record->_claim = parent->_claim; - if (record->_claim) - { - record->_offset = parent->_offset; - record->_size = parent->_size; - record->_claim->increaseFlowFileRecordOwnedCount(); - } - } - return record; -} - -FlowFileRecord* ProcessSession::cloneDuringTransfer(FlowFileRecord *parent) -{ - std::map<std::string, std::string> empty; - FlowFileRecord *record = new FlowFileRecord(empty); - - if (record) - { - this->_clonedFlowFiles[record->getUUIDStr()] = record; - _logger->log_debug("Clone FlowFile with UUID %s during transfer", record->getUUIDStr().c_str()); - // Copy attributes - std::map<std::string, std::string> parentAttributes = parent->getAttributes(); - std::map<std::string, std::string>::iterator it; - for (it = parentAttributes.begin(); it!= parentAttributes.end(); it++) - { - if (it->first == FlowAttributeKey(ALTERNATE_IDENTIFIER) || - it->first == FlowAttributeKey(DISCARD_REASON) || - it->first == FlowAttributeKey(UUID)) - // Do not copy special attributes from parent - continue; - record->setAttribute(it->first, it->second); - } - record->_lineageStartDate = parent->_lineageStartDate; - record->_lineageIdentifiers = parent->_lineageIdentifiers; - record->_lineageIdentifiers.insert(parent->_uuidStr); - - // Copy Resource Claim - record->_claim = parent->_claim; - if (record->_claim) - { - record->_offset = parent->_offset; - record->_size = parent->_size; - record->_claim->increaseFlowFileRecordOwnedCount(); - } - } - - return record; -} - -FlowFileRecord* ProcessSession::clone(FlowFileRecord *parent, long offset, long size) -{ - FlowFileRecord *record = this->create(parent); - if (record) - { - if (parent->_claim) - { - if ((offset + size) > (long) parent->_size) - { - // Set offset and size - _logger->log_error("clone offset %d and size %d exceed parent size %d", - offset, size, parent->_size); - // Remove the Add FlowFile for the session - std::map<std::string, FlowFileRecord *>::iterator it = - this->_addedFlowFiles.find(record->getUUIDStr()); - if (it != this->_addedFlowFiles.end()) - this->_addedFlowFiles.erase(record->getUUIDStr()); - delete record; - return NULL; - } - record->_offset = parent->_offset + parent->_offset; - record->_size = size; - // Copy Resource Claim - record->_claim = parent->_claim; - record->_claim->increaseFlowFileRecordOwnedCount(); - } - } - return record; -} - -void ProcessSession::remove(FlowFileRecord *flow) -{ - flow->_markedDelete = true; - _deletedFlowFiles[flow->getUUIDStr()] = flow; -} - -void ProcessSession::putAttribute(FlowFileRecord *flow, std::string key, std::string value) -{ - flow->setAttribute(key, value); -} - -void ProcessSession::removeAttribute(FlowFileRecord *flow, std::string key) -{ - flow->removeAttribute(key); -} - -void ProcessSession::penalize(FlowFileRecord *flow) -{ - flow->_penaltyExpirationMs = getTimeMillis() + this->_processContext->getProcessor()->getPenalizationPeriodMsec(); -} - -void ProcessSession::transfer(FlowFileRecord *flow, Relationship relationship) -{ - _transferRelationship[flow->getUUIDStr()] = relationship; -} - -void ProcessSession::write(FlowFileRecord *flow, OutputStreamCallback *callback) -{ - ResourceClaim *claim = NULL; - - claim = new ResourceClaim(DEFAULT_CONTENT_DIRECTORY); - - try - { - std::ofstream fs; - fs.open(claim->getContentFullPath().c_str(), std::fstream::out | std::fstream::binary | std::fstream::trunc); - if (fs.is_open()) - { - // Call the callback to write the content - callback->process(&fs); - if (fs.good() && fs.tellp() >= 0) - { - flow->_size = fs.tellp(); - flow->_offset = 0; - if (flow->_claim) - { - // Remove the old claim - flow->_claim->decreaseFlowFileRecordOwnedCount(); - flow->_claim = NULL; - } - flow->_claim = claim; - claim->increaseFlowFileRecordOwnedCount(); - /* - _logger->log_debug("Write offset %d length %d into content %s for FlowFile UUID %s", - flow->_offset, flow->_size, flow->_claim->getContentFullPath().c_str(), flow->getUUIDStr().c_str()); */ - fs.close(); - } - else - { - fs.close(); - throw Exception(FILE_OPERATION_EXCEPTION, "File Write Error"); - } - } - else - { - throw Exception(FILE_OPERATION_EXCEPTION, "File Open Error"); - } - } - catch (std::exception &exception) - { - if (flow && flow->_claim == claim) - { - flow->_claim->decreaseFlowFileRecordOwnedCount(); - flow->_claim = NULL; - } - if (claim) - delete claim; - _logger->log_debug("Caught Exception %s", exception.what()); - throw; - } - catch (...) - { - if (flow && flow->_claim == claim) - { - flow->_claim->decreaseFlowFileRecordOwnedCount(); - flow->_claim = NULL; - } - if (claim) - delete claim; - _logger->log_debug("Caught Exception during process session write"); - throw; - } -} - -void ProcessSession::append(FlowFileRecord *flow, OutputStreamCallback *callback) -{ - ResourceClaim *claim = NULL; - - if (flow->_claim == NULL) - { - // No existed claim for append, we need to create new claim - return write(flow, callback); - } - - claim = flow->_claim; - - try - { - std::ofstream fs; - fs.open(claim->getContentFullPath().c_str(), std::fstream::out | std::fstream::binary | std::fstream::app); - if (fs.is_open()) - { - // Call the callback to write the content - std::streampos oldPos = fs.tellp(); - callback->process(&fs); - if (fs.good() && fs.tellp() >= 0) - { - uint64_t appendSize = fs.tellp() - oldPos; - flow->_size += appendSize; - /* - _logger->log_debug("Append offset %d extra length %d to new size %d into content %s for FlowFile UUID %s", - flow->_offset, appendSize, flow->_size, claim->getContentFullPath().c_str(), flow->getUUIDStr().c_str()); */ - fs.close(); - } - else - { - fs.close(); - throw Exception(FILE_OPERATION_EXCEPTION, "File Write Error"); - } - } - else - { - throw Exception(FILE_OPERATION_EXCEPTION, "File Open Error"); - } - } - catch (std::exception &exception) - { - _logger->log_debug("Caught Exception %s", exception.what()); - throw; - } - catch (...) - { - _logger->log_debug("Caught Exception during process session append"); - throw; - } -} - -void ProcessSession::read(FlowFileRecord *flow, InputStreamCallback *callback) -{ - try - { - ResourceClaim *claim = NULL; - if (flow->_claim == NULL) - { - // No existed claim for read, we throw exception - throw Exception(FILE_OPERATION_EXCEPTION, "No Content Claim existed for read"); - } - - claim = flow->_claim; - std::ifstream fs; - fs.open(claim->getContentFullPath().c_str(), std::fstream::in | std::fstream::binary); - if (fs.is_open()) - { - fs.seekg(flow->_offset, fs.beg); - - if (fs.good()) - { - callback->process(&fs); - /* - _logger->log_debug("Read offset %d size %d content %s for FlowFile UUID %s", - flow->_offset, flow->_size, claim->getContentFullPath().c_str(), flow->getUUIDStr().c_str()); */ - fs.close(); - } - else - { - fs.close(); - throw Exception(FILE_OPERATION_EXCEPTION, "File Read Error"); - } - } - else - { - throw Exception(FILE_OPERATION_EXCEPTION, "File Open Error"); - } - } - catch (std::exception &exception) - { - _logger->log_debug("Caught Exception %s", exception.what()); - throw; - } - catch (...) - { - _logger->log_debug("Caught Exception during process session read"); - throw; - } -} - -void ProcessSession::import(std::string source, FlowFileRecord *flow, bool keepSource, uint64_t offset) -{ - ResourceClaim *claim = NULL; - - claim = new ResourceClaim(DEFAULT_CONTENT_DIRECTORY); - char *buf = NULL; - int size = 4096; - buf = new char [size]; - - try - { - std::ofstream fs; - fs.open(claim->getContentFullPath().c_str(), std::fstream::out | std::fstream::binary | std::fstream::trunc); - std::ifstream input; - input.open(source.c_str(), std::fstream::in | std::fstream::binary); - - if (fs.is_open() && input.is_open()) - { - // Open the source file and stream to the flow file - input.seekg(offset, fs.beg); - while (input.good()) - { - input.read(buf, size); - if (input) - fs.write(buf, size); - else - fs.write(buf, input.gcount()); - } - - if (fs.good() && fs.tellp() >= 0) - { - flow->_size = fs.tellp(); - flow->_offset = 0; - if (flow->_claim) - { - // Remove the old claim - flow->_claim->decreaseFlowFileRecordOwnedCount(); - flow->_claim = NULL; - } - flow->_claim = claim; - claim->increaseFlowFileRecordOwnedCount(); - /* - _logger->log_debug("Import offset %d length %d into content %s for FlowFile UUID %s", - flow->_offset, flow->_size, flow->_claim->getContentFullPath().c_str(), flow->getUUIDStr().c_str()); */ - fs.close(); - input.close(); - if (!keepSource) - std::remove(source.c_str()); - } - else - { - fs.close(); - input.close(); - throw Exception(FILE_OPERATION_EXCEPTION, "File Import Error"); - } - } - else - { - throw Exception(FILE_OPERATION_EXCEPTION, "File Import Error"); - } - - delete[] buf; - } - catch (std::exception &exception) - { - if (flow && flow->_claim == claim) - { - flow->_claim->decreaseFlowFileRecordOwnedCount(); - flow->_claim = NULL; - } - if (claim) - delete claim; - _logger->log_debug("Caught Exception %s", exception.what()); - delete[] buf; - throw; - } - catch (...) - { - if (flow && flow->_claim == claim) - { - flow->_claim->decreaseFlowFileRecordOwnedCount(); - flow->_claim = NULL; - } - if (claim) - delete claim; - _logger->log_debug("Caught Exception during process session write"); - delete[] buf; - throw; - } -} - -void ProcessSession::commit() -{ - try - { - // First we clone the flow record based on the transfered relationship for updated flow record - std::map<std::string, FlowFileRecord *>::iterator it; - for (it = _updatedFlowFiles.begin(); it!= _updatedFlowFiles.end(); it++) - { - FlowFileRecord *record = it->second; - if (record->_markedDelete) - continue; - std::map<std::string, Relationship>::iterator itRelationship = - this->_transferRelationship.find(record->getUUIDStr()); - if (itRelationship != _transferRelationship.end()) - { - Relationship relationship = itRelationship->second; - // Find the relationship, we need to find the connections for that relationship - std::set<Connection *> connections = - _processContext->getProcessor()->getOutGoingConnections(relationship.getName()); - if (connections.empty()) - { - // No connection - if (!_processContext->getProcessor()->isAutoTerminated(relationship)) - { - // Not autoterminate, we should have the connect - std::string message = "Connect empty for non auto terminated relationship" + relationship.getName(); - throw Exception(PROCESS_SESSION_EXCEPTION, message.c_str()); - } - else - { - // Autoterminated - remove(record); - } - } - else - { - // We connections, clone the flow and assign the connection accordingly - for (std::set<Connection *>::iterator itConnection = connections.begin(); itConnection != connections.end(); ++itConnection) - { - Connection *connection(*itConnection); - if (itConnection == connections.begin()) - { - // First connection which the flow need be routed to - record->_connection = connection; - } - else - { - // Clone the flow file and route to the connection - FlowFileRecord *cloneRecord; - cloneRecord = this->cloneDuringTransfer(record); - if (cloneRecord) - cloneRecord->_connection = connection; - else - throw Exception(PROCESS_SESSION_EXCEPTION, "Can not clone the flow for transfer"); - } - } - } - } - else - { - // Can not find relationship for the flow - throw Exception(PROCESS_SESSION_EXCEPTION, "Can not find the transfer relationship for the flow"); - } - } - - // Do the samething for added flow file - for (it = _addedFlowFiles.begin(); it!= _addedFlowFiles.end(); it++) - { - FlowFileRecord *record = it->second; - if (record->_markedDelete) - continue; - std::map<std::string, Relationship>::iterator itRelationship = - this->_transferRelationship.find(record->getUUIDStr()); - if (itRelationship != _transferRelationship.end()) - { - Relationship relationship = itRelationship->second; - // Find the relationship, we need to find the connections for that relationship - std::set<Connection *> connections = - _processContext->getProcessor()->getOutGoingConnections(relationship.getName()); - if (connections.empty()) - { - // No connection - if (!_processContext->getProcessor()->isAutoTerminated(relationship)) - { - // Not autoterminate, we should have the connect - std::string message = "Connect empty for non auto terminated relationship " + relationship.getName(); - throw Exception(PROCESS_SESSION_EXCEPTION, message.c_str()); - } - else - { - // Autoterminated - remove(record); - } - } - else - { - // We connections, clone the flow and assign the connection accordingly - for (std::set<Connection *>::iterator itConnection = connections.begin(); itConnection != connections.end(); ++itConnection) - { - Connection *connection(*itConnection); - if (itConnection == connections.begin()) - { - // First connection which the flow need be routed to - record->_connection = connection; - } - else - { - // Clone the flow file and route to the connection - FlowFileRecord *cloneRecord; - cloneRecord = this->cloneDuringTransfer(record); - if (cloneRecord) - cloneRecord->_connection = connection; - else - throw Exception(PROCESS_SESSION_EXCEPTION, "Can not clone the flow for transfer"); - } - } - } - } - else - { - // Can not find relationship for the flow - throw Exception(PROCESS_SESSION_EXCEPTION, "Can not find the transfer relationship for the flow"); - } - } - - // Complete process the added and update flow files for the session, send the flow file to its queue - for (it = _updatedFlowFiles.begin(); it!= _updatedFlowFiles.end(); it++) - { - FlowFileRecord *record = it->second; - if (record->_markedDelete) - { - continue; - } - if (record->_connection) - record->_connection->put(record); - else - delete record; - } - for (it = _addedFlowFiles.begin(); it!= _addedFlowFiles.end(); it++) - { - FlowFileRecord *record = it->second; - if (record->_markedDelete) - { - continue; - } - if (record->_connection) - record->_connection->put(record); - else - delete record; - } - // Process the clone flow files - for (it = _clonedFlowFiles.begin(); it!= _clonedFlowFiles.end(); it++) - { - FlowFileRecord *record = it->second; - if (record->_markedDelete) - { - continue; - } - if (record->_connection) - record->_connection->put(record); - else - delete record; - } - // Delete the deleted flow files - for (it = _deletedFlowFiles.begin(); it!= _deletedFlowFiles.end(); it++) - { - FlowFileRecord *record = it->second; - delete record; - } - // Delete the snapshot - for (it = _originalFlowFiles.begin(); it!= _originalFlowFiles.end(); it++) - { - FlowFileRecord *record = it->second; - delete record; - } - // All done - _updatedFlowFiles.clear(); - _addedFlowFiles.clear(); - _clonedFlowFiles.clear(); - _deletedFlowFiles.clear(); - _originalFlowFiles.clear(); - _logger->log_trace("ProcessSession committed for %s", _processContext->getProcessor()->getName().c_str()); - } - catch (std::exception &exception) - { - _logger->log_debug("Caught Exception %s", exception.what()); - throw; - } - catch (...) - { - _logger->log_debug("Caught Exception during process session commit"); - throw; - } -} - - -void ProcessSession::rollback() -{ - try - { - std::map<std::string, FlowFileRecord *>::iterator it; - // Requeue the snapshot of the flowfile back - for (it = _originalFlowFiles.begin(); it!= _originalFlowFiles.end(); it++) - { - FlowFileRecord *record = it->second; - if (record->_orginalConnection) - { - record->_snapshot = false; - record->_orginalConnection->put(record); - } - else - delete record; - } - _originalFlowFiles.clear(); - // Process the clone flow files - for (it = _clonedFlowFiles.begin(); it!= _clonedFlowFiles.end(); it++) - { - FlowFileRecord *record = it->second; - delete record; - } - _clonedFlowFiles.clear(); - for (it = _addedFlowFiles.begin(); it!= _addedFlowFiles.end(); it++) - { - FlowFileRecord *record = it->second; - delete record; - } - _addedFlowFiles.clear(); - for (it = _updatedFlowFiles.begin(); it!= _updatedFlowFiles.end(); it++) - { - FlowFileRecord *record = it->second; - delete record; - } - _updatedFlowFiles.clear(); - _deletedFlowFiles.clear(); - _logger->log_trace("ProcessSession rollback for %s", _processContext->getProcessor()->getName().c_str()); - } - catch (std::exception &exception) - { - _logger->log_debug("Caught Exception %s", exception.what()); - throw; - } - catch (...) - { - _logger->log_debug("Caught Exception during process session roll back"); - throw; - } -} - -FlowFileRecord *ProcessSession::get() -{ - Connection *first = _processContext->getProcessor()->getNextIncomingConnection(); - - if (first == NULL) - return NULL; - - Connection *current = first; - - do - { - std::set<FlowFileRecord *> expired; - FlowFileRecord *ret = current->poll(expired); - if (expired.size() > 0) - { - // Remove expired flow record - for (std::set<FlowFileRecord *>::iterator it = expired.begin(); it != expired.end(); ++it) - { - delete (*it); - } - } - if (ret) - { - // add the flow record to the current process session update map - ret->_markedDelete = false; - _updatedFlowFiles[ret->getUUIDStr()] = ret; - std::map<std::string, std::string> empty; - FlowFileRecord *snapshot = new FlowFileRecord(empty); - _logger->log_debug("Create Snapshot FlowFile with UUID %s", snapshot->getUUIDStr().c_str()); - snapshot->duplicate(ret); - // save a snapshot - _originalFlowFiles[snapshot->getUUIDStr()] = snapshot; - return ret; - } - current = _processContext->getProcessor()->getNextIncomingConnection(); - } - while (current != NULL && current != first); - - return NULL; -} - http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b02af540/src/Processor.cpp ---------------------------------------------------------------------- diff --git a/src/Processor.cpp b/src/Processor.cpp deleted file mode 100644 index cc136dc..0000000 --- a/src/Processor.cpp +++ /dev/null @@ -1,451 +0,0 @@ -/** - * @file Processor.cpp - * Processor class implementation - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#include <vector> -#include <queue> -#include <map> -#include <set> -#include <sys/time.h> -#include <time.h> -#include <chrono> -#include <thread> - -#include "Processor.h" -#include "ProcessContext.h" -#include "ProcessSession.h" - -Processor::Processor(std::string name, uuid_t uuid) -: _name(name) -{ - if (!uuid) - // Generate the global UUID for the flow record - uuid_generate(_uuid); - else - uuid_copy(_uuid, uuid); - - char uuidStr[37]; - uuid_unparse(_uuid, uuidStr); - _uuidStr = uuidStr; - - // Setup the default values - _state = DISABLED; - _strategy = TIMER_DRIVEN; - _lossTolerant = false; - _triggerWhenEmpty = false; - _schedulingPeriodNano = MINIMUM_SCHEDULING_NANOS; - _runDurantionNano = 0; - _yieldPeriodMsec = DEFAULT_YIELD_PERIOD_SECONDS * 1000; - _penalizationPeriodMsec = DEFAULT_PENALIZATION_PERIOD_SECONDS * 1000; - _maxConcurrentTasks = 1; - _activeTasks = 0; - _yieldExpiration = 0; - _incomingConnectionsIter = this->_incomingConnections.begin(); - _logger = Logger::getLogger(); - - _logger->log_info("Processor %s created UUID %s", _name.c_str(), _uuidStr.c_str()); -} - -Processor::~Processor() -{ - -} - -bool Processor::isRunning() -{ - return (_state == RUNNING && _activeTasks > 0); -} - -bool Processor::setSupportedProperties(std::set<Property> properties) -{ - if (isRunning()) - { - _logger->log_info("Can not set processor property while the process %s is running", - _name.c_str()); - return false; - } - - std::lock_guard<std::mutex> lock(_mtx); - - _properties.clear(); - for (std::set<Property>::iterator it = properties.begin(); it != properties.end(); ++it) - { - Property item(*it); - _properties[item.getName()] = item; - _logger->log_info("Processor %s supported property name %s", _name.c_str(), item.getName().c_str()); - } - - return true; -} - -bool Processor::setSupportedRelationships(std::set<Relationship> relationships) -{ - if (isRunning()) - { - _logger->log_info("Can not set processor supported relationship while the process %s is running", - _name.c_str()); - return false; - } - - std::lock_guard<std::mutex> lock(_mtx); - - _relationships.clear(); - for (std::set<Relationship>::iterator it = relationships.begin(); it != relationships.end(); ++it) - { - Relationship item(*it); - _relationships[item.getName()] = item; - _logger->log_info("Processor %s supported relationship name %s", _name.c_str(), item.getName().c_str()); - } - - return true; -} - -bool Processor::setAutoTerminatedRelationships(std::set<Relationship> relationships) -{ - if (isRunning()) - { - _logger->log_info("Can not set processor auto terminated relationship while the process %s is running", - _name.c_str()); - return false; - } - - std::lock_guard<std::mutex> lock(_mtx); - - _autoTerminatedRelationships.clear(); - for (std::set<Relationship>::iterator it = relationships.begin(); it != relationships.end(); ++it) - { - Relationship item(*it); - _autoTerminatedRelationships[item.getName()] = item; - _logger->log_info("Processor %s auto terminated relationship name %s", _name.c_str(), item.getName().c_str()); - } - - return true; -} - -bool Processor::isAutoTerminated(Relationship relationship) -{ - bool isRun = isRunning(); - - if (!isRun) - _mtx.lock(); - - std::map<std::string, Relationship>::iterator it = _autoTerminatedRelationships.find(relationship.getName()); - if (it != _autoTerminatedRelationships.end()) - { - if (!isRun) - _mtx.unlock(); - return true; - } - else - { - if (!isRun) - _mtx.unlock(); - return false; - } -} - -bool Processor::isSupportedRelationship(Relationship relationship) -{ - bool isRun = isRunning(); - - if (!isRun) - _mtx.lock(); - - std::map<std::string, Relationship>::iterator it = _relationships.find(relationship.getName()); - if (it != _relationships.end()) - { - if (!isRun) - _mtx.unlock(); - return true; - } - else - { - if (!isRun) - _mtx.unlock(); - return false; - } -} - -bool Processor::getProperty(std::string name, std::string &value) -{ - bool isRun = isRunning(); - - if (!isRun) - // Because set property only allowed in non running state, we need to obtain lock avoid rack condition - _mtx.lock(); - - std::map<std::string, Property>::iterator it = _properties.find(name); - if (it != _properties.end()) - { - Property item = it->second; - value = item.getValue(); - if (!isRun) - _mtx.unlock(); - return true; - } - else - { - if (!isRun) - _mtx.unlock(); - return false; - } -} - -bool Processor::setProperty(std::string name, std::string value) -{ - - std::lock_guard<std::mutex> lock(_mtx); - std::map<std::string, Property>::iterator it = _properties.find(name); - - if (it != _properties.end()) - { - Property item = it->second; - item.setValue(value); - _properties[item.getName()] = item; - _logger->log_info("Processor %s property name %s value %s", _name.c_str(), item.getName().c_str(), value.c_str()); - return true; - } - else - { - return false; - } -} - -std::set<Connection *> Processor::getOutGoingConnections(std::string relationship) -{ - std::set<Connection *> empty; - - std::map<std::string, std::set<Connection *>>::iterator it = _outGoingConnections.find(relationship); - if (it != _outGoingConnections.end()) - { - return _outGoingConnections[relationship]; - } - else - { - return empty; - } -} - -bool Processor::addConnection(Connection *connection) -{ - bool ret = false; - - if (isRunning()) - { - _logger->log_info("Can not add connection while the process %s is running", - _name.c_str()); - return false; - } - - std::lock_guard<std::mutex> lock(_mtx); - - uuid_t srcUUID; - uuid_t destUUID; - - connection->getSourceProcessorUUID(srcUUID); - connection->getDestinationProcessorUUID(destUUID); - - if (uuid_compare(_uuid, destUUID) == 0) - { - // Connection is destination to the current processor - if (_incomingConnections.find(connection) == _incomingConnections.end()) - { - _incomingConnections.insert(connection); - connection->setDestinationProcessor(this); - _logger->log_info("Add connection %s into Processor %s incoming connection", - connection->getName().c_str(), _name.c_str()); - _incomingConnectionsIter = this->_incomingConnections.begin(); - ret = true; - } - } - - if (uuid_compare(_uuid, srcUUID) == 0) - { - std::string relationship = connection->getRelationship().getName(); - // Connection is source from the current processor - std::map<std::string, std::set<Connection *>>::iterator it = - _outGoingConnections.find(relationship); - if (it != _outGoingConnections.end()) - { - // We already has connection for this relationship - std::set<Connection *> existedConnection = it->second; - if (existedConnection.find(connection) == existedConnection.end()) - { - // We do not have the same connection for this relationship yet - existedConnection.insert(connection); - connection->setSourceProcessor(this); - _outGoingConnections[relationship] = existedConnection; - _logger->log_info("Add connection %s into Processor %s outgoing connection for relationship %s", - connection->getName().c_str(), _name.c_str(), relationship.c_str()); - ret = true; - } - } - else - { - // We do not have any outgoing connection for this relationship yet - std::set<Connection *> newConnection; - newConnection.insert(connection); - connection->setSourceProcessor(this); - _outGoingConnections[relationship] = newConnection; - _logger->log_info("Add connection %s into Processor %s outgoing connection for relationship %s", - connection->getName().c_str(), _name.c_str(), relationship.c_str()); - ret = true; - } - } - - return ret; -} - -void Processor::removeConnection(Connection *connection) -{ - if (isRunning()) - { - _logger->log_info("Can not remove connection while the process %s is running", - _name.c_str()); - return; - } - - std::lock_guard<std::mutex> lock(_mtx); - - uuid_t srcUUID; - uuid_t destUUID; - - connection->getSourceProcessorUUID(srcUUID); - connection->getDestinationProcessorUUID(destUUID); - - if (uuid_compare(_uuid, destUUID) == 0) - { - // Connection is destination to the current processor - if (_incomingConnections.find(connection) != _incomingConnections.end()) - { - _incomingConnections.erase(connection); - connection->setDestinationProcessor(NULL); - _logger->log_info("Remove connection %s into Processor %s incoming connection", - connection->getName().c_str(), _name.c_str()); - _incomingConnectionsIter = this->_incomingConnections.begin(); - } - } - - if (uuid_compare(_uuid, srcUUID) == 0) - { - std::string relationship = connection->getRelationship().getName(); - // Connection is source from the current processor - std::map<std::string, std::set<Connection *>>::iterator it = - _outGoingConnections.find(relationship); - if (it == _outGoingConnections.end()) - { - return; - } - else - { - if (_outGoingConnections[relationship].find(connection) != _outGoingConnections[relationship].end()) - { - _outGoingConnections[relationship].erase(connection); - connection->setSourceProcessor(NULL); - _logger->log_info("Remove connection %s into Processor %s outgoing connection for relationship %s", - connection->getName().c_str(), _name.c_str(), relationship.c_str()); - } - } - } -} - -Connection *Processor::getNextIncomingConnection() -{ - std::lock_guard<std::mutex> lock(_mtx); - - if (_incomingConnections.size() == 0) - return NULL; - - if (_incomingConnectionsIter == _incomingConnections.end()) - _incomingConnectionsIter = _incomingConnections.begin(); - - Connection *ret = *_incomingConnectionsIter; - _incomingConnectionsIter++; - - if (_incomingConnectionsIter == _incomingConnections.end()) - _incomingConnectionsIter = _incomingConnections.begin(); - - return ret; -} - -bool Processor::flowFilesQueued() -{ - std::lock_guard<std::mutex> lock(_mtx); - - if (_incomingConnections.size() == 0) - return false; - - for (std::set<Connection *>::iterator it = _incomingConnections.begin(); it != _incomingConnections.end(); ++it) - { - Connection *connection = *it; - if (connection->getQueueSize() > 0) - return true; - } - - return false; -} - -bool Processor::flowFilesOutGoingFull() -{ - std::lock_guard<std::mutex> lock(_mtx); - - std::map<std::string, std::set<Connection *>>::iterator it; - - for (it = _outGoingConnections.begin(); it != _outGoingConnections.end(); ++it) - { - // We already has connection for this relationship - std::set<Connection *> existedConnection = it->second; - for (std::set<Connection *>::iterator itConnection = existedConnection.begin(); itConnection != existedConnection.end(); ++itConnection) - { - Connection *connection = *itConnection; - if (connection->isFull()) - return true; - } - } - - return false; -} - -void Processor::onTrigger() -{ - ProcessContext *context = new ProcessContext(this); - ProcessSession *session = new ProcessSession(context); - try { - // Call the child onTrigger function - this->onTrigger(context, session); - session->commit(); - delete session; - delete context; - } - catch (std::exception &exception) - { - _logger->log_debug("Caught Exception %s", exception.what()); - session->rollback(); - delete session; - delete context; - throw; - } - catch (...) - { - _logger->log_debug("Caught Exception Processor::onTrigger"); - session->rollback(); - delete session; - delete context; - throw; - } -} http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b02af540/src/RealTimeDataCollector.cpp ---------------------------------------------------------------------- diff --git a/src/RealTimeDataCollector.cpp b/src/RealTimeDataCollector.cpp deleted file mode 100644 index c7118ff..0000000 --- a/src/RealTimeDataCollector.cpp +++ /dev/null @@ -1,482 +0,0 @@ -/** - * @file RealTimeDataCollector.cpp - * RealTimeDataCollector class implementation - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#include <vector> -#include <queue> -#include <map> -#include <set> -#include <sys/time.h> -#include <time.h> -#include <chrono> -#include <thread> -#include <random> -#include <netinet/tcp.h> - -#include "RealTimeDataCollector.h" -#include "ProcessContext.h" -#include "ProcessSession.h" - -const std::string RealTimeDataCollector::ProcessorName("RealTimeDataCollector"); -Property RealTimeDataCollector::FILENAME("File Name", "File Name for the real time processor to process", "data.osp"); -Property RealTimeDataCollector::REALTIMESERVERNAME("Real Time Server Name", "Real Time Server Name", "localhost"); -Property RealTimeDataCollector::REALTIMESERVERPORT("Real Time Server Port", "Real Time Server Port", "10000"); -Property RealTimeDataCollector::BATCHSERVERNAME("Batch Server Name", "Batch Server Name", "localhost"); -Property RealTimeDataCollector::BATCHSERVERPORT("Batch Server Port", "Batch Server Port", "10001"); -Property RealTimeDataCollector::ITERATION("Iteration", - "If true, sample osp file will be iterated", "true"); -Property RealTimeDataCollector::REALTIMEMSGID("Real Time Message ID", "Real Time Message ID", "41"); -Property RealTimeDataCollector::BATCHMSGID("Batch Message ID", "Batch Message ID", "172, 30, 48"); -Property RealTimeDataCollector::REALTIMEINTERVAL("Real Time Interval", "Real Time Data Collection Interval in msec", "10 ms"); -Property RealTimeDataCollector::BATCHINTERVAL("Batch Time Interval", "Batch Processing Interval in msec", "100 ms"); -Property RealTimeDataCollector::BATCHMAXBUFFERSIZE("Batch Max Buffer Size", "Batch Buffer Maximum size in bytes", "262144"); -Relationship RealTimeDataCollector::Success("success", "success operational on the flow record"); - -void RealTimeDataCollector::initialize() -{ - //! Set the supported properties - std::set<Property> properties; - properties.insert(FILENAME); - properties.insert(REALTIMESERVERNAME); - properties.insert(REALTIMESERVERPORT); - properties.insert(BATCHSERVERNAME); - properties.insert(BATCHSERVERPORT); - properties.insert(ITERATION); - properties.insert(REALTIMEMSGID); - properties.insert(BATCHMSGID); - properties.insert(REALTIMEINTERVAL); - properties.insert(BATCHINTERVAL); - properties.insert(BATCHMAXBUFFERSIZE); - - setSupportedProperties(properties); - //! Set the supported relationships - std::set<Relationship> relationships; - relationships.insert(Success); - setSupportedRelationships(relationships); - -} - -int RealTimeDataCollector::connectServer(const char *host, uint16_t port) -{ - in_addr_t addr; - int sock = 0; - struct hostent *h; -#ifdef __MACH__ - h = gethostbyname(host); -#else - char buf[1024]; - struct hostent he; - int hh_errno; - gethostbyname_r(host, &he, buf, sizeof(buf), &h, &hh_errno); -#endif - memcpy((char *) &addr, h->h_addr_list[0], h->h_length); - sock = socket(AF_INET, SOCK_STREAM, 0); - if (sock < 0) - { - _logger->log_error("Could not create socket to hostName %s", host); - return 0; - } - -#ifndef __MACH__ - int opt = 1; - bool nagle_off = true; - - if (nagle_off) - { - if (setsockopt(sock, SOL_TCP, TCP_NODELAY, (void *)&opt, sizeof(opt)) < 0) - { - _logger->log_error("setsockopt() TCP_NODELAY failed"); - close(sock); - return 0; - } - if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, - (char *)&opt, sizeof(opt)) < 0) - { - _logger->log_error("setsockopt() SO_REUSEADDR failed"); - close(sock); - return 0; - } - } - - int sndsize = 256*1024; - if (setsockopt(sock, SOL_SOCKET, SO_SNDBUF, (char *)&sndsize, (int)sizeof(sndsize)) < 0) - { - _logger->log_error("setsockopt() SO_SNDBUF failed"); - close(sock); - return 0; - } -#endif - - struct sockaddr_in sa; - socklen_t socklen; - int status; - - //TODO bind socket to the interface - memset(&sa, 0, sizeof(sa)); - sa.sin_family = AF_INET; - sa.sin_addr.s_addr = htonl(INADDR_ANY); - sa.sin_port = htons(0); - socklen = sizeof(sa); - if (bind(sock, (struct sockaddr *)&sa, socklen) < 0) - { - _logger->log_error("socket bind failed"); - close(sock); - return 0; - } - - memset(&sa, 0, sizeof(sa)); - sa.sin_family = AF_INET; - sa.sin_addr.s_addr = addr; - sa.sin_port = htons(port); - socklen = sizeof(sa); - - status = connect(sock, (struct sockaddr *)&sa, socklen); - - if (status < 0) - { - _logger->log_error("socket connect failed to %s %d", host, port); - close(sock); - return 0; - } - - _logger->log_info("socket %d connect to server %s port %d success", sock, host, port); - - return sock; -} - -int RealTimeDataCollector::sendData(int socket, const char *buf, int buflen) -{ - int ret = 0, bytes = 0; - - while (bytes < buflen) - { - ret = send(socket, buf+bytes, buflen-bytes, 0); - //check for errors - if (ret == -1) - { - return ret; - } - bytes+=ret; - } - - if (ret) - _logger->log_debug("Send data size %d over socket %d", buflen, socket); - - return ret; -} - -void RealTimeDataCollector::onTriggerRealTime(ProcessContext *context, ProcessSession *session) -{ - if (_realTimeAccumulated >= this->_realTimeInterval) - { - std::string value; - if (this->getProperty(REALTIMEMSGID.getName(), value)) - { - this->_realTimeMsgID.clear(); - this->_logger->log_info("Real Time Msg IDs %s", value.c_str()); - std::stringstream lineStream(value); - std::string cell; - - while(std::getline(lineStream, cell, ',')) - { - this->_realTimeMsgID.push_back(cell); - // this->_logger->log_debug("Real Time Msg ID %s", cell.c_str()); - } - } - if (this->getProperty(BATCHMSGID.getName(), value)) - { - this->_batchMsgID.clear(); - this->_logger->log_info("Batch Msg IDs %s", value.c_str()); - std::stringstream lineStream(value); - std::string cell; - - while(std::getline(lineStream, cell, ',')) - { - cell = Property::trim(cell); - this->_batchMsgID.push_back(cell); - // this->_logger->log_debug("Batch Msg ID %s", cell.c_str()); - } - } - // _logger->log_info("onTriggerRealTime"); - // Open the file - if (!this->_fileStream.is_open()) - { - _fileStream.open(this->_fileName.c_str(), std::ifstream::in); - if (this->_fileStream.is_open()) - _logger->log_debug("open %s", _fileName.c_str()); - } - if (!_fileStream.good()) - { - _logger->log_error("load data file failed %s", _fileName.c_str()); - return; - } - if (this->_fileStream.is_open()) - { - std::string line; - - while (std::getline(_fileStream, line)) - { - line += "\n"; - std::stringstream lineStream(line); - std::string cell; - if (std::getline(lineStream, cell, ',')) - { - cell = Property::trim(cell); - // Check whether it match to the batch traffic - for (std::vector<std::string>::iterator it = _batchMsgID.begin(); it != _batchMsgID.end(); ++it) - { - if (cell == *it) - { - // push the batch data to the queue - std::lock_guard<std::mutex> lock(_mtx); - while ((_queuedDataSize + line.size()) > _batchMaxBufferSize) - { - std::string item = _queue.front(); - _queuedDataSize -= item.size(); - _logger->log_debug("Pop item size %d from batch queue, queue buffer size %d", item.size(), _queuedDataSize); - _queue.pop(); - } - _queue.push(line); - _queuedDataSize += line.size(); - _logger->log_debug("Push batch msg ID %s into batch queue, queue buffer size %d", cell.c_str(), _queuedDataSize); - } - } - bool findRealTime = false; - // Check whether it match to the real time traffic - for (std::vector<std::string>::iterator it = _realTimeMsgID.begin(); it != _realTimeMsgID.end(); ++it) - { - if (cell == *it) - { - int status = 0; - if (this->_realTimeSocket <= 0) - { - // Connect the LTE socket - uint16_t port = _realTimeServerPort; - this->_realTimeSocket = connectServer(_realTimeServerName.c_str(), port); - } - if (this->_realTimeSocket) - { - // try to send the data - status = sendData(_realTimeSocket, line.data(), line.size()); - if (status < 0) - { - close(_realTimeSocket); - _realTimeSocket = 0; - } - } - if (this->_realTimeSocket <= 0 || status < 0) - { - // push the batch data to the queue - std::lock_guard<std::mutex> lock(_mtx); - while ((_queuedDataSize + line.size()) > _batchMaxBufferSize) - { - std::string item = _queue.front(); - _queuedDataSize -= item.size(); - _logger->log_debug("Pop item size %d from batch queue, queue buffer size %d", item.size(), _queuedDataSize); - _queue.pop(); - } - _queue.push(line); - _queuedDataSize += line.size(); - _logger->log_debug("Push real time msg ID %s into batch queue, queue buffer size %d", cell.c_str(), _queuedDataSize); - } - // find real time - findRealTime = true; - } // cell - } // for real time pattern - if (findRealTime) - // we break the while once we find the first real time - break; - } // if get line - } // while - if (_fileStream.eof()) - { - _fileStream.close(); - } - } // if open - _realTimeAccumulated = 0; - } - _realTimeAccumulated += context->getProcessor()->getSchedulingPeriodNano(); -} - -void RealTimeDataCollector::onTriggerBatch(ProcessContext *context, ProcessSession *session) -{ - if (_batchAcccumulated >= this->_batchInterval) - { - // _logger->log_info("onTriggerBatch"); - // dequeue the batch and send over WIFI - int status = 0; - if (this->_batchSocket <= 0) - { - // Connect the WIFI socket - uint16_t port = _batchServerPort; - this->_batchSocket = connectServer(_batchServerName.c_str(), port); - } - if (this->_batchSocket) - { - std::lock_guard<std::mutex> lock(_mtx); - - while (!_queue.empty()) - { - std::string line = _queue.front(); - status = sendData(_batchSocket, line.data(), line.size()); - _queue.pop(); - _queuedDataSize -= line.size(); - if (status < 0) - { - close(_batchSocket); - _batchSocket = 0; - break; - } - } - } - _batchAcccumulated = 0; - } - _batchAcccumulated += context->getProcessor()->getSchedulingPeriodNano(); -} - -void RealTimeDataCollector::onTrigger(ProcessContext *context, ProcessSession *session) -{ - std::thread::id id = std::this_thread::get_id(); - - if (id == _realTimeThreadId) - return onTriggerRealTime(context, session); - else if (id == _batchThreadId) - return onTriggerBatch(context, session); - else - { - std::lock_guard<std::mutex> lock(_mtx); - if (!this->_firstInvoking) - { - this->_fileName = "data.osp"; - std::string value; - if (this->getProperty(FILENAME.getName(), value)) - { - this->_fileName = value; - this->_logger->log_info("Data Collector File Name %s", _fileName.c_str()); - } - this->_realTimeServerName = "localhost"; - if (this->getProperty(REALTIMESERVERNAME.getName(), value)) - { - this->_realTimeServerName = value; - this->_logger->log_info("Real Time Server Name %s", this->_realTimeServerName.c_str()); - } - this->_realTimeServerPort = 10000; - if (this->getProperty(REALTIMESERVERPORT.getName(), value)) - { - Property::StringToInt(value, _realTimeServerPort); - this->_logger->log_info("Real Time Server Port %d", _realTimeServerPort); - } - if (this->getProperty(BATCHSERVERNAME.getName(), value)) - { - this->_batchServerName = value; - this->_logger->log_info("Batch Server Name %s", this->_batchServerName.c_str()); - } - this->_batchServerPort = 10001; - if (this->getProperty(BATCHSERVERPORT.getName(), value)) - { - Property::StringToInt(value, _batchServerPort); - this->_logger->log_info("Batch Server Port %d", _batchServerPort); - } - if (this->getProperty(ITERATION.getName(), value)) - { - Property::StringToBool(value, this->_iteration); - _logger->log_info("Iteration %d", _iteration); - } - this->_realTimeInterval = 10000000; //10 msec - if (this->getProperty(REALTIMEINTERVAL.getName(), value)) - { - TimeUnit unit; - if (Property::StringToTime(value, _realTimeInterval, unit) && - Property::ConvertTimeUnitToNS(_realTimeInterval, unit, _realTimeInterval)) - { - _logger->log_info("Real Time Interval: [%d] ns", _realTimeInterval); - } - } - this->_batchInterval = 100000000; //100 msec - if (this->getProperty(BATCHINTERVAL.getName(), value)) - { - TimeUnit unit; - if (Property::StringToTime(value, _batchInterval, unit) && - Property::ConvertTimeUnitToNS(_batchInterval, unit, _batchInterval)) - { - _logger->log_info("Batch Time Interval: [%d] ns", _batchInterval); - } - } - this->_batchMaxBufferSize = 256*1024; - if (this->getProperty(BATCHMAXBUFFERSIZE.getName(), value)) - { - Property::StringToInt(value, _batchMaxBufferSize); - this->_logger->log_info("Batch Max Buffer Size %d", _batchMaxBufferSize); - } - if (this->getProperty(REALTIMEMSGID.getName(), value)) - { - this->_logger->log_info("Real Time Msg IDs %s", value.c_str()); - std::stringstream lineStream(value); - std::string cell; - - while(std::getline(lineStream, cell, ',')) - { - this->_realTimeMsgID.push_back(cell); - this->_logger->log_info("Real Time Msg ID %s", cell.c_str()); - } - } - if (this->getProperty(BATCHMSGID.getName(), value)) - { - this->_logger->log_info("Batch Msg IDs %s", value.c_str()); - std::stringstream lineStream(value); - std::string cell; - - while(std::getline(lineStream, cell, ',')) - { - cell = Property::trim(cell); - this->_batchMsgID.push_back(cell); - this->_logger->log_info("Batch Msg ID %s", cell.c_str()); - } - } - // Connect the LTE socket - uint16_t port = _realTimeServerPort; - - this->_realTimeSocket = connectServer(_realTimeServerName.c_str(), port); - - // Connect the WIFI socket - port = _batchServerPort; - - this->_batchSocket = connectServer(_batchServerName.c_str(), port); - - // Open the file - _fileStream.open(this->_fileName.c_str(), std::ifstream::in); - if (!_fileStream.good()) - { - _logger->log_error("load data file failed %s", _fileName.c_str()); - return; - } - else - { - _logger->log_debug("open %s", _fileName.c_str()); - } - _realTimeThreadId = id; - this->_firstInvoking = true; - } - else - { - if (id != _realTimeThreadId) - _batchThreadId = id; - this->_firstInvoking = false; - } - } -} http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b02af540/src/RemoteProcessorGroupPort.cpp ---------------------------------------------------------------------- diff --git a/src/RemoteProcessorGroupPort.cpp b/src/RemoteProcessorGroupPort.cpp deleted file mode 100644 index 9d849ae..0000000 --- a/src/RemoteProcessorGroupPort.cpp +++ /dev/null @@ -1,100 +0,0 @@ -/** - * @file RemoteProcessorGroupPort.cpp - * RemoteProcessorGroupPort class implementation - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#include <vector> -#include <queue> -#include <map> -#include <set> -#include <sys/time.h> -#include <time.h> -#include <sstream> -#include <string.h> -#include <iostream> - -#include "TimeUtil.h" -#include "RemoteProcessorGroupPort.h" -#include "ProcessContext.h" -#include "ProcessSession.h" - -const std::string RemoteProcessorGroupPort::ProcessorName("RemoteProcessorGroupPort"); -Property RemoteProcessorGroupPort::hostName("Host Name", "Remote Host Name.", "localhost"); -Property RemoteProcessorGroupPort::port("Port", "Remote Port", "9999"); -Relationship RemoteProcessorGroupPort::relation; - -void RemoteProcessorGroupPort::initialize() -{ - //! Set the supported properties - std::set<Property> properties; - properties.insert(hostName); - properties.insert(port); - setSupportedProperties(properties); - //! Set the supported relationships - std::set<Relationship> relationships; - relationships.insert(relation); - setSupportedRelationships(relationships); -} - -void RemoteProcessorGroupPort::onTrigger(ProcessContext *context, ProcessSession *session) -{ - std::string value; - - if (!_transmitting) - return; - - std::string host = _peer->getHostName(); - uint16_t sport = _peer->getPort(); - int64_t lvalue; - bool needReset = false; - - if (context->getProperty(hostName.getName(), value)) - { - host = value; - } - if (context->getProperty(port.getName(), value) && Property::StringToInt(value, lvalue)) - { - sport = (uint16_t) lvalue; - } - if (host != _peer->getHostName()) - { - _peer->setHostName(host); - needReset= true; - } - if (sport != _peer->getPort()) - { - _peer->setPort(sport); - needReset = true; - } - if (needReset) - _protocol->tearDown(); - - if (!_protocol->bootstrap()) - { - // bootstrap the client protocol if needeed - context->yield(); - _logger->log_error("Site2Site bootstrap failed yield period %d peer timeout %d", context->getProcessor()->getYieldPeriodMsec(), _protocol->getPeer()->getTimeOut()); - return; - } - - if (_direction == RECEIVE) - _protocol->receiveFlowFiles(context, session); - else - _protocol->transferFlowFiles(context, session); - - return; -} http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b02af540/src/ResourceClaim.cpp ---------------------------------------------------------------------- diff --git a/src/ResourceClaim.cpp b/src/ResourceClaim.cpp deleted file mode 100644 index 3c22ac9..0000000 --- a/src/ResourceClaim.cpp +++ /dev/null @@ -1,45 +0,0 @@ -/** - * @file ResourceClaim.cpp - * ResourceClaim class implementation - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#include <vector> -#include <queue> -#include <map> - -#include "ResourceClaim.h" - -std::atomic<uint64_t> ResourceClaim::_localResourceClaimNumber(0); - -ResourceClaim::ResourceClaim(const std::string contentDirectory) -: _id(_localResourceClaimNumber.load()), - _flowFileRecordOwnedCount(0) -{ - char uuidStr[37]; - - // Generate the global UUID for the resource claim - uuid_generate(_uuid); - // Increase the local ID for the resource claim - ++_localResourceClaimNumber; - uuid_unparse(_uuid, uuidStr); - // Create the full content path for the content - _contentFullPath = contentDirectory + "/" + uuidStr; - - _configure = Configure::getConfigure(); - _logger = Logger::getLogger(); - _logger->log_debug("Resource Claim created %s", _contentFullPath.c_str()); -} http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b02af540/src/SchedulingAgent.cpp ---------------------------------------------------------------------- diff --git a/src/SchedulingAgent.cpp b/src/SchedulingAgent.cpp deleted file mode 100644 index 211c328..0000000 --- a/src/SchedulingAgent.cpp +++ /dev/null @@ -1,86 +0,0 @@ -/** - * @file SchedulingAgent.cpp - * SchedulingAgent class implementation - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#include <chrono> -#include <thread> -#include <iostream> -#include "Exception.h" -#include "SchedulingAgent.h" - -bool SchedulingAgent::hasWorkToDo(Processor *processor) -{ - // Whether it has work to do - if (processor->getTriggerWhenEmpty() || !processor->hasIncomingConnections() || - processor->flowFilesQueued()) - return true; - else - return false; -} - -bool SchedulingAgent::hasTooMuchOutGoing(Processor *processor) -{ - return processor->flowFilesOutGoingFull(); -} - -bool SchedulingAgent::onTrigger(Processor *processor) -{ - if (processor->isYield()) - return false; - - // No need to yield, reset yield expiration to 0 - processor->clearYield(); - - if (!hasWorkToDo(processor)) - // No work to do, yield - return true; - - if(hasTooMuchOutGoing(processor)) - // need to apply backpressure - return true; - - //TODO runDuration - - processor->incrementActiveTasks(); - try - { - processor->onTrigger(); - processor->decrementActiveTask(); - } - catch (Exception &exception) - { - // Normal exception - _logger->log_debug("Caught Exception %s", exception.what()); - processor->decrementActiveTask(); - } - catch (std::exception &exception) - { - _logger->log_debug("Caught Exception %s", exception.what()); - processor->yield(_administrativeYieldDuration); - processor->decrementActiveTask(); - } - catch (...) - { - _logger->log_debug("Caught Exception during SchedulingAgent::onTrigger"); - processor->yield(_administrativeYieldDuration); - processor->decrementActiveTask(); - } - - return false; -} -