http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b02af540/inc/ProcessGroup.h ---------------------------------------------------------------------- diff --git a/inc/ProcessGroup.h b/inc/ProcessGroup.h deleted file mode 100644 index 4dd26f8..0000000 --- a/inc/ProcessGroup.h +++ /dev/null @@ -1,182 +0,0 @@ -/** - * @file ProcessGroup.h - * ProcessGroup class declaration - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#ifndef __PROCESS_GROUP_H__ -#define __PROCESS_GROUP_H__ - -#include <uuid/uuid.h> -#include <vector> -#include <queue> -#include <map> -#include <mutex> -#include <atomic> -#include <algorithm> -#include <set> - -#include "Logger.h" -#include "Processor.h" -#include "Exception.h" -#include "TimerDrivenSchedulingAgent.h" - -//! Process Group Type -enum ProcessGroupType -{ - ROOT_PROCESS_GROUP = 0, - REMOTE_PROCESS_GROUP, - MAX_PROCESS_GROUP_TYPE -}; - -//! ProcessGroup Class -class ProcessGroup -{ -public: - //! Constructor - /*! - * Create a new process group - */ - ProcessGroup(ProcessGroupType type, std::string name, uuid_t uuid = NULL, ProcessGroup *parent = NULL); - //! Destructor - virtual ~ProcessGroup(); - //! Set Processor Name - void setName(std::string name) { - _name = name; - } - //! Get Process Name - std::string getName(void) { - return (_name); - } - //! Set URL - void setURL(std::string url) { - _url = url; - } - //! Get URL - std::string getURL(void) { - return (_url); - } - //! SetTransmitting - void setTransmitting(bool val) - { - _transmitting = val; - } - //! Get Transmitting - bool getTransmitting() - { - return _transmitting; - } - //! setTimeOut - void setTimeOut(uint64_t time) - { - _timeOut = time; - } - uint64_t getTimeOut() - { - return _timeOut; - } - //! Set Processor yield period in MilliSecond - void setYieldPeriodMsec(uint64_t period) { - _yieldPeriodMsec = period; - } - //! Get Processor yield period in MilliSecond - uint64_t getYieldPeriodMsec(void) { - return(_yieldPeriodMsec); - } - //! Set UUID - void setUUID(uuid_t uuid) { - uuid_copy(_uuid, uuid); - } - //! Get UUID - bool getUUID(uuid_t uuid) { - if (uuid) - { - uuid_copy(uuid, _uuid); - return true; - } - else - return false; - } - //! Start Processing - void startProcessing(TimerDrivenSchedulingAgent *timeScheduler); - //! Stop Processing - void stopProcessing(TimerDrivenSchedulingAgent *timeScheduler); - //! Whether it is root process group - bool isRootProcessGroup(); - //! set parent process group - void setParent(ProcessGroup *parent) { - std::lock_guard<std::mutex> lock(_mtx); - _parentProcessGroup = parent; - } - //! get parent process group - ProcessGroup *getParent(void) { - std::lock_guard<std::mutex> lock(_mtx); - return _parentProcessGroup; - } - //! Add processor - void addProcessor(Processor *processor); - //! Remove processor - void removeProcessor(Processor *processor); - //! Add child processor group - void addProcessGroup(ProcessGroup *child); - //! Remove child processor group - void removeProcessGroup(ProcessGroup *child); - // ! Add connections - void addConnection(Connection *connection); - //! findProcessor based on UUID - Processor *findProcessor(uuid_t uuid); - //! findProcessor based on name - Processor *findProcessor(std::string processorName); - //! removeConnection - void removeConnection(Connection *connection); - //! update property value - void updatePropertyValue(std::string processorName, std::string propertyName, std::string propertyValue); - -protected: - //! A global unique identifier - uuid_t _uuid; - //! Processor Group Name - std::string _name; - //! Process Group Type - ProcessGroupType _type; - //! Processors (ProcessNode) inside this process group which include Input/Output Port, Remote Process Group input/Output port - std::set<Processor *> _processors; - std::set<ProcessGroup *> _childProcessGroups; - //! Connections between the processor inside the group; - std::set<Connection *> _connections; - //! Parent Process Group - ProcessGroup* _parentProcessGroup; - //! Yield Period in Milliseconds - std::atomic<uint64_t> _yieldPeriodMsec; - std::atomic<uint64_t> _timeOut; - //! URL - std::string _url; - //! Transmitting - std::atomic<bool> _transmitting; - -private: - - //! Mutex for protection - std::mutex _mtx; - //! Logger - Logger *_logger; - // Prevent default copy constructor and assignment operation - // Only support pass by reference or pointer - ProcessGroup(const ProcessGroup &parent); - ProcessGroup &operator=(const ProcessGroup &parent); -}; - -#endif
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b02af540/inc/ProcessSession.h ---------------------------------------------------------------------- diff --git a/inc/ProcessSession.h b/inc/ProcessSession.h deleted file mode 100644 index c8ec3a5..0000000 --- a/inc/ProcessSession.h +++ /dev/null @@ -1,116 +0,0 @@ -/** - * @file ProcessSession.h - * ProcessSession class declaration - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#ifndef __PROCESS_SESSION_H__ -#define __PROCESS_SESSION_H__ - -#include <uuid/uuid.h> -#include <vector> -#include <queue> -#include <map> -#include <mutex> -#include <atomic> -#include <algorithm> -#include <set> - -#include "Logger.h" -#include "Processor.h" -#include "ProcessContext.h" -#include "FlowFileRecord.h" -#include "Exception.h" - -//! ProcessSession Class -class ProcessSession -{ -public: - //! Constructor - /*! - * Create a new process session - */ - ProcessSession(ProcessContext *processContext = NULL) : _processContext(processContext) { - _logger = Logger::getLogger(); - _logger->log_trace("ProcessSession created for %s", _processContext->getProcessor()->getName().c_str()); - } - //! Destructor - virtual ~ProcessSession() {} - //! Commit the session - void commit(); - //! Roll Back the session - void rollback(); - //! - //! Get the FlowFile from the highest priority queue - FlowFileRecord *get(); - //! Create a new UUID FlowFile with no content resource claim and without parent - FlowFileRecord *create(); - //! Create a new UUID FlowFile with no content resource claim and inherit all attributes from parent - FlowFileRecord *create(FlowFileRecord *parent); - //! Clone a new UUID FlowFile from parent both for content resource claim and attributes - FlowFileRecord *clone(FlowFileRecord *parent); - //! Clone a new UUID FlowFile from parent for attributes and sub set of parent content resource claim - FlowFileRecord *clone(FlowFileRecord *parent, long offset, long size); - //! Duplicate a FlowFile with the same UUID and all attributes and content resource claim for the roll back of the session - FlowFileRecord *duplicate(FlowFileRecord *orignal); - //! Transfer the FlowFile to the relationship - void transfer(FlowFileRecord *flow, Relationship relationship); - //! Put Attribute - void putAttribute(FlowFileRecord *flow, std::string key, std::string value); - //! Remove Attribute - void removeAttribute(FlowFileRecord *flow, std::string key); - //! Remove Flow File - void remove(FlowFileRecord *flow); - //! Execute the given read callback against the content - void read(FlowFileRecord *flow, InputStreamCallback *callback); - //! Execute the given write callback against the content - void write(FlowFileRecord *flow, OutputStreamCallback *callback); - //! Execute the given write/append callback against the content - void append(FlowFileRecord *flow, OutputStreamCallback *callback); - //! Penalize the flow - void penalize(FlowFileRecord *flow); - //! Import the existed file into the flow - void import(std::string source, FlowFileRecord *flow, bool keepSource = true, uint64_t offset = 0); - -protected: - //! FlowFiles being modified by current process session - std::map<std::string, FlowFileRecord *> _updatedFlowFiles; - //! Copy of the original FlowFiles being modified by current process session as above - std::map<std::string, FlowFileRecord *> _originalFlowFiles; - //! FlowFiles being added by current process session - std::map<std::string, FlowFileRecord *> _addedFlowFiles; - //! FlowFiles being deleted by current process session - std::map<std::string, FlowFileRecord *> _deletedFlowFiles; - //! FlowFiles being transfered to the relationship - std::map<std::string, Relationship> _transferRelationship; - //! FlowFiles being cloned for multiple connections per relationship - std::map<std::string, FlowFileRecord *> _clonedFlowFiles; - -private: - // Clone the flow file during transfer to multiple connections for a relationship - FlowFileRecord* cloneDuringTransfer(FlowFileRecord *parent); - //! ProcessContext - ProcessContext *_processContext; - // Prevent default copy constructor and assignment operation - // Only support pass by reference or pointer - ProcessSession(const ProcessSession &parent); - ProcessSession &operator=(const ProcessSession &parent); - //! Logger - Logger *_logger; - -}; - -#endif http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b02af540/inc/Processor.h ---------------------------------------------------------------------- diff --git a/inc/Processor.h b/inc/Processor.h deleted file mode 100644 index db26ad0..0000000 --- a/inc/Processor.h +++ /dev/null @@ -1,346 +0,0 @@ -/** - * @file Processor.h - * Processor class declaration - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#ifndef __PROCESSOR_H__ -#define __PROCESSOR_H__ - -#include <uuid/uuid.h> -#include <vector> -#include <queue> -#include <map> -#include <mutex> -#include <atomic> -#include <algorithm> -#include <set> - -#include "TimeUtil.h" -#include "Property.h" -#include "Relationship.h" -#include "Connection.h" - -//! Forwarder declaration -class ProcessContext; -class ProcessSession; - -//! Minimum scheduling period in Nano Second -#define MINIMUM_SCHEDULING_NANOS 30000 - -//! Default yield period in second -#define DEFAULT_YIELD_PERIOD_SECONDS 1 - -//! Default penalization period in second -#define DEFAULT_PENALIZATION_PERIOD_SECONDS 30 - -/*! - * Indicates the valid values for the state of a entity - * with respect to scheduling the entity to run. - */ -enum ScheduledState { - - /** - * Entity cannot be scheduled to run - */ - DISABLED, - /** - * Entity can be scheduled to run but currently is not - */ - STOPPED, - /** - * Entity is currently scheduled to run - */ - RUNNING -}; - -/*! - * Scheduling Strategy - */ -enum SchedulingStrategy { - //! Event driven - EVENT_DRIVEN, - //! Timer driven - TIMER_DRIVEN, - //! Cron Driven - CRON_DRIVEN -}; - -//! Processor Class -class Processor -{ - friend class ProcessContext; -public: - //! Constructor - /*! - * Create a new processor - */ - Processor(std::string name, uuid_t uuid = NULL); - //! Destructor - virtual ~Processor(); - //! Set Processor Name - void setName(std::string name) { - _name = name; - } - //! Get Process Name - std::string getName(void) { - return (_name); - } - //! Set UUID - void setUUID(uuid_t uuid) { - uuid_copy(_uuid, uuid); - char uuidStr[37]; - uuid_unparse(_uuid, uuidStr); - _uuidStr = uuidStr; - } - //! Get UUID - bool getUUID(uuid_t uuid) { - if (uuid) - { - uuid_copy(uuid, _uuid); - return true; - } - else - return false; - } - //! Set the supported processor properties while the process is not running - bool setSupportedProperties(std::set<Property> properties); - //! Set the supported relationships while the process is not running - bool setSupportedRelationships(std::set<Relationship> relationships); - //! Get the supported property value by name - bool getProperty(std::string name, std::string &value); - //! Set the supported property value by name wile the process is not running - bool setProperty(std::string name, std::string value); - //! Whether the relationship is supported - bool isSupportedRelationship(Relationship relationship); - //! Set the auto terminated relationships while the process is not running - bool setAutoTerminatedRelationships(std::set<Relationship> relationships); - //! Check whether the relationship is auto terminated - bool isAutoTerminated(Relationship relationship); - //! Check whether the processor is running - bool isRunning(); - //! Set Processor Scheduled State - void setScheduledState(ScheduledState state) { - _state = state; - } - //! Get Processor Scheduled State - ScheduledState getScheduledState(void) { - return _state; - } - //! Set Processor Scheduling Strategy - void setSchedulingStrategy(SchedulingStrategy strategy) { - _strategy = strategy; - } - //! Get Processor Scheduling Strategy - SchedulingStrategy getSchedulingStrategy(void) { - return _strategy; - } - //! Set Processor Loss Tolerant - void setlossTolerant(bool lossTolerant) { - _lossTolerant = lossTolerant; - } - //! Get Processor Loss Tolerant - bool getlossTolerant(void) { - return _lossTolerant; - } - //! Set Processor Scheduling Period in Nano Second - void setSchedulingPeriodNano(uint64_t period) { - uint64_t minPeriod = MINIMUM_SCHEDULING_NANOS; - _schedulingPeriodNano = std::max(period, minPeriod); - } - //! Get Processor Scheduling Period in Nano Second - uint64_t getSchedulingPeriodNano(void) { - return _schedulingPeriodNano; - } - //! Set Processor Run Duration in Nano Second - void setRunDurationNano(uint64_t period) { - _runDurantionNano = period; - } - //! Get Processor Run Duration in Nano Second - uint64_t getRunDurationNano(void) { - return(_runDurantionNano); - } - //! Set Processor yield period in MilliSecond - void setYieldPeriodMsec(uint64_t period) { - _yieldPeriodMsec = period; - } - //! Get Processor yield period in MilliSecond - uint64_t getYieldPeriodMsec(void) { - return(_yieldPeriodMsec); - } - //! Set Processor penalization period in MilliSecond - void setPenalizationPeriodMsec(uint64_t period) { - _penalizationPeriodMsec = period; - } - //! Get Processor penalization period in MilliSecond - uint64_t getPenalizationPeriodMsec(void) { - return(_penalizationPeriodMsec); - } - //! Set Processor Maximum Concurrent Tasks - void setMaxConcurrentTasks(uint8_t tasks) { - _maxConcurrentTasks = tasks; - } - //! Get Processor Maximum Concurrent Tasks - uint8_t getMaxConcurrentTasks(void) { - return(_maxConcurrentTasks); - } - //! Set Trigger when empty - void setTriggerWhenEmpty(bool value) { - _triggerWhenEmpty = value; - } - //! Get Trigger when empty - bool getTriggerWhenEmpty(void) { - return(_triggerWhenEmpty); - } - //! Get Active Task Counts - uint8_t getActiveTasks(void) { - return(_activeTasks); - } - //! Increment Active Task Counts - void incrementActiveTasks(void) { - _activeTasks++; - } - //! decrement Active Task Counts - void decrementActiveTask(void) { - _activeTasks--; - } - void clearActiveTask(void) { - _activeTasks = 0; - } - //! Yield based on the yield period - void yield() - { - _yieldExpiration = (getTimeMillis() + _yieldPeriodMsec); - } - //! Yield based on the input time - void yield(uint64_t time) - { - _yieldExpiration = (getTimeMillis() + time); - } - //! whether need be to yield - bool isYield() - { - if (_yieldExpiration > 0) - return (_yieldExpiration >= getTimeMillis()); - else - return false; - } - // clear yield expiration - void clearYield() - { - _yieldExpiration = 0; - } - // get yield time - uint64_t getYieldTime() - { - uint64_t curTime = getTimeMillis(); - if (_yieldExpiration > curTime) - return (_yieldExpiration - curTime); - else - return 0;; - } - //! Whether flow file queued in incoming connection - bool flowFilesQueued(); - //! Whether flow file queue full in any of the outgoin connection - bool flowFilesOutGoingFull(); - //! Get incoming connections - std::set<Connection *> getIncomingConnections() { - return _incomingConnections; - } - //! Has Incoming Connection - bool hasIncomingConnections() { - return (_incomingConnections.size() > 0); - } - //! Get outgoing connections based on relationship name - std::set<Connection *> getOutGoingConnections(std::string relationship); - //! Add connection - bool addConnection(Connection *connection); - //! Remove connection - void removeConnection(Connection *connection); - //! Get the UUID as string - std::string getUUIDStr() { - return _uuidStr; - } - //! Get the Next RoundRobin incoming connection - Connection *getNextIncomingConnection(); - //! On Trigger - void onTrigger(); - -public: - //! OnTrigger method, implemented by NiFi Processor Designer - virtual void onTrigger(ProcessContext *context, ProcessSession *session) = 0; - //! Initialize, over write by NiFi Process Designer - virtual void initialize(void) { - return; - } - -protected: - - //! A global unique identifier - uuid_t _uuid; - //! Processor Name - std::string _name; - //! Supported properties - std::map<std::string, Property> _properties; - //! Supported relationships - std::map<std::string, Relationship> _relationships; - //! Autoterminated relationships - std::map<std::string, Relationship> _autoTerminatedRelationships; - //! Processor state - std::atomic<ScheduledState> _state; - //! Scheduling Strategy - std::atomic<SchedulingStrategy> _strategy; - //! lossTolerant - std::atomic<bool> _lossTolerant; - //! SchedulePeriod in Nano Seconds - std::atomic<uint64_t> _schedulingPeriodNano; - //! Run Duration in Nano Seconds - std::atomic<uint64_t> _runDurantionNano; - //! Yield Period in Milliseconds - std::atomic<uint64_t> _yieldPeriodMsec; - //! Penalization Period in MilliSecond - std::atomic<uint64_t> _penalizationPeriodMsec; - //! Maximum Concurrent Tasks - std::atomic<uint8_t> _maxConcurrentTasks; - //! Active Tasks - std::atomic<uint8_t> _activeTasks; - //! Trigger the Processor even if the incoming connection is empty - std::atomic<bool> _triggerWhenEmpty; - //! Incoming connections - std::set<Connection *> _incomingConnections; - //! Outgoing connections map based on Relationship name - std::map<std::string, std::set<Connection *>> _outGoingConnections; - //! UUID string - std::string _uuidStr; - -private: - - //! Mutex for protection - std::mutex _mtx; - //! Yield Expiration - std::atomic<uint64_t> _yieldExpiration; - //! Incoming connection Iterator - std::set<Connection *>::iterator _incomingConnectionsIter; - //! Logger - Logger *_logger; - // Prevent default copy constructor and assignment operation - // Only support pass by reference or pointer - Processor(const Processor &parent); - Processor &operator=(const Processor &parent); - -}; - -#endif http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b02af540/inc/Property.h ---------------------------------------------------------------------- diff --git a/inc/Property.h b/inc/Property.h deleted file mode 100644 index a724394..0000000 --- a/inc/Property.h +++ /dev/null @@ -1,344 +0,0 @@ -/** - * @file Property.h - * Processor Property class declaration - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#ifndef __PROPERTY_H__ -#define __PROPERTY_H__ - -#include <string> -#include <vector> -#include <queue> -#include <map> -#include <mutex> -#include <atomic> -#include <set> -#include <stdlib.h> -#include <math.h> - -//! Time Unit -enum TimeUnit { - DAY, - HOUR, - MINUTE, - SECOND, - MILLISECOND, - NANOSECOND -}; - -//! Property Class -class Property { - -public: - //! Constructor - /*! - * Create a new property - */ - Property(const std::string name, const std::string description, const std::string value) - : _name(name), _description(description), _value(value) { - } - Property() {} - //! Destructor - virtual ~Property() {} - //! Get Name for the property - std::string getName() { - return _name; - } - //! Get Description for the property - std::string getDescription() { - return _description; - } - //! Get value for the property - std::string getValue() { - return _value; - } - //! Set value for the property - void setValue(std::string value) { - _value = value; - } - //! Compare - bool operator < (const Property & right) const { - return _name < right._name; - } - - //! Convert TimeUnit to MilliSecond - static bool ConvertTimeUnitToMS(int64_t input, TimeUnit unit, int64_t &out) - { - if (unit == MILLISECOND) - { - out = input; - return true; - } - else if (unit == SECOND) - { - out = input * 1000; - return true; - } - else if (unit == MINUTE) - { - out = input * 60 * 1000; - return true; - } - else if (unit == HOUR) - { - out = input * 60 * 60 * 1000; - return true; - } - else if (unit == DAY) - { - out = 24 * 60 * 60 * 1000; - return true; - } - else if (unit == NANOSECOND) - { - out = input/1000/1000; - return true; - } - else - { - return false; - } - } - //! Convert TimeUnit to NanoSecond - static bool ConvertTimeUnitToNS(int64_t input, TimeUnit unit, int64_t &out) - { - if (unit == MILLISECOND) - { - out = input * 1000 * 1000; - return true; - } - else if (unit == SECOND) - { - out = input * 1000 * 1000 * 1000; - return true; - } - else if (unit == MINUTE) - { - out = input * 60 * 1000 * 1000 * 1000; - return true; - } - else if (unit == HOUR) - { - out = input * 60 * 60 * 1000 * 1000 * 1000; - return true; - } - else if (unit == NANOSECOND) - { - out = input; - return true; - } - else - { - return false; - } - } - //! Convert String - static bool StringToTime(std::string input, int64_t &output, TimeUnit &timeunit) - { - if (input.size() == 0) { - return false; - } - - const char *cvalue = input.c_str(); - char *pEnd; - long int ival = strtol(cvalue, &pEnd, 0); - - if (pEnd[0] == '\0') - { - return false; - } - - while (*pEnd == ' ') - { - // Skip the space - pEnd++; - } - - std::string unit(pEnd); - - if (unit == "sec" || unit == "s" || unit == "second" || unit == "seconds" || unit == "secs") - { - timeunit = SECOND; - output = ival; - return true; - } - else if (unit == "min" || unit == "m" || unit == "mins" || unit == "minute" || unit == "minutes") - { - timeunit = MINUTE; - output = ival; - return true; - } - else if (unit == "ns" || unit == "nano" || unit == "nanos" || unit == "nanoseconds") - { - timeunit = NANOSECOND; - output = ival; - return true; - } - else if (unit == "ms" || unit == "milli" || unit == "millis" || unit == "milliseconds") - { - timeunit = MILLISECOND; - output = ival; - return true; - } - else if (unit == "h" || unit == "hr" || unit == "hour" || unit == "hrs" || unit == "hours") - { - timeunit = HOUR; - output = ival; - return true; - } - else if (unit == "d" || unit == "day" || unit == "days") - { - timeunit = DAY; - output = ival; - return true; - } - else - return false; - } - - //! Convert String to Integer - static bool StringToInt(std::string input, int64_t &output) - { - if (input.size() == 0) { - return false; - } - - const char *cvalue = input.c_str(); - char *pEnd; - long int ival = strtol(cvalue, &pEnd, 0); - - if (pEnd[0] == '\0') - { - output = ival; - return true; - } - - while (*pEnd == ' ') - { - // Skip the space - pEnd++; - } - - char end0 = toupper(pEnd[0]); - if ( (end0 == 'K') || (end0 == 'M') || (end0 == 'G') || (end0 == 'T') || (end0 == 'P') ) - { - if (pEnd[1] == '\0') - { - unsigned long int multiplier = 1000; - - if ( (end0 != 'K') ) { - multiplier *= 1000; - if (end0 != 'M') { - multiplier *= 1000; - if (end0 != 'G') { - multiplier *= 1000; - if (end0 != 'T') { - multiplier *= 1000; - } - } - } - } - output = ival * multiplier; - return true; - - } else if ((pEnd[1] == 'b' || pEnd[1] == 'B') && (pEnd[2] == '\0')) { - - unsigned long int multiplier = 1024; - - if ( (end0 != 'K') ) { - multiplier *= 1024; - if (end0 != 'M') { - multiplier *= 1024; - if (end0 != 'G') { - multiplier *= 1024; - if (end0 != 'T') { - multiplier *= 1024; - } - } - } - } - output = ival * multiplier; - return true; - } - } - - return false; - } - //! Convert String to Float - static bool StringToFloat(std::string input, float &output) - { - const char *cvalue = input.c_str(); - char *pEnd; - float val = strtof(cvalue, &pEnd); - - if (pEnd[0] == '\0') - { - output = val; - return true; - } - else - return false; - } - //! Convert String to Bool - static bool StringToBool(std::string input, bool &output) - { - if (input == "true" || input == "True" || input == "TRUE") - { - output = true; - return true; - } - if (input == "false" || input == "False" || input == "FALSE") - { - output = false; - return true; - } - return false; - } - - // Trim String utils - static std::string trim(const std::string& s) - { - return trimRight(trimLeft(s)); - } - - static std::string trimLeft(const std::string& s) - { - const char *WHITESPACE = " \n\r\t"; - size_t startpos = s.find_first_not_of(WHITESPACE); - return (startpos == std::string::npos) ? "" : s.substr(startpos); - } - - static std::string trimRight(const std::string& s) - { - const char *WHITESPACE = " \n\r\t"; - size_t endpos = s.find_last_not_of(WHITESPACE); - return (endpos == std::string::npos) ? "" : s.substr(0, endpos+1); - } - -protected: - //! Name - std::string _name; - //! Description - std::string _description; - //! Value - std::string _value; - -private: - -}; - -#endif http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b02af540/inc/RealTimeDataCollector.h ---------------------------------------------------------------------- diff --git a/inc/RealTimeDataCollector.h b/inc/RealTimeDataCollector.h deleted file mode 100644 index 760b566..0000000 --- a/inc/RealTimeDataCollector.h +++ /dev/null @@ -1,131 +0,0 @@ -/** - * @file RealTimeDataCollector.h - * RealTimeDataCollector class declaration - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#ifndef __REAL_TIME_DATA_COLLECTOR_H__ -#define __REAL_TIME_DATA_COLLECTOR_H__ - -#include <stdio.h> -#include <unistd.h> -#include <sys/types.h> -#include <sys/socket.h> -#include <netinet/in.h> -#include <arpa/inet.h> -#include <fcntl.h> -#include <netdb.h> -#include <string> -#include <errno.h> -#include "FlowFileRecord.h" -#include "Processor.h" -#include "ProcessSession.h" - -//! RealTimeDataCollector Class -class RealTimeDataCollector : public Processor -{ -public: - //! Constructor - /*! - * Create a new processor - */ - RealTimeDataCollector(std::string name, uuid_t uuid = NULL) - : Processor(name, uuid) - { - _realTimeSocket = 0; - _batchSocket = 0; - _logger = Logger::getLogger(); - _firstInvoking = false; - _realTimeAccumulated = 0; - _batchAcccumulated = 0; - _queuedDataSize = 0; - } - //! Destructor - virtual ~RealTimeDataCollector() - { - if (_realTimeSocket) - close(_realTimeSocket); - if (_batchSocket) - close(_batchSocket); - if (_fileStream.is_open()) - _fileStream.close(); - } - //! Processor Name - static const std::string ProcessorName; - //! Supported Properties - static Property REALTIMESERVERNAME; - static Property REALTIMESERVERPORT; - static Property BATCHSERVERNAME; - static Property BATCHSERVERPORT; - static Property FILENAME; - static Property ITERATION; - static Property REALTIMEMSGID; - static Property BATCHMSGID; - static Property REALTIMEINTERVAL; - static Property BATCHINTERVAL; - static Property BATCHMAXBUFFERSIZE; - //! Supported Relationships - static Relationship Success; - //! Connect to the socket - int connectServer(const char *host, uint16_t port); - int sendData(int socket, const char *buf, int buflen); - void onTriggerRealTime(ProcessContext *context, ProcessSession *session); - void onTriggerBatch(ProcessContext *context, ProcessSession *session); - -public: - //! OnTrigger method, implemented by NiFi RealTimeDataCollector - virtual void onTrigger(ProcessContext *context, ProcessSession *session); - //! Initialize, over write by NiFi RealTimeDataCollector - virtual void initialize(void); - -protected: - -private: - //! realtime server Name - std::string _realTimeServerName; - int64_t _realTimeServerPort; - std::string _batchServerName; - int64_t _batchServerPort; - int64_t _realTimeInterval; - int64_t _batchInterval; - int64_t _batchMaxBufferSize; - //! Match pattern for Real time Message ID - std::vector<std::string> _realTimeMsgID; - //! Match pattern for Batch Message ID - std::vector<std::string> _batchMsgID; - //! file for which the realTime collector will tail - std::string _fileName; - //! Whether we need to iterate from the beginning for demo - bool _iteration; - int _realTimeSocket; - int _batchSocket; - //! Logger - Logger *_logger; - //! Mutex for protection - std::mutex _mtx; - //! Queued data size - uint64_t _queuedDataSize; - //! Queue for the batch process - std::queue<std::string> _queue; - std::thread::id _realTimeThreadId; - std::thread::id _batchThreadId; - std::atomic<bool> _firstInvoking; - int64_t _realTimeAccumulated; - int64_t _batchAcccumulated; - std::ifstream _fileStream; -}; - -#endif http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b02af540/inc/Relationship.h ---------------------------------------------------------------------- diff --git a/inc/Relationship.h b/inc/Relationship.h deleted file mode 100644 index 3454ee5..0000000 --- a/inc/Relationship.h +++ /dev/null @@ -1,87 +0,0 @@ -/** - * @file Relationship.h - * Relationship class declaration - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#ifndef __RELATIONSHIP_H__ -#define __RELATIONSHIP_H__ - -#include <string> -#include <uuid/uuid.h> -#include <vector> -#include <queue> -#include <map> -#include <mutex> -#include <atomic> - -//! undefined relationship for remote process group outgoing port and root process group incoming port -#define UNDEFINED_RELATIONSHIP "undefined" - -inline bool isRelationshipNameUndefined(std::string name) -{ - if (name == UNDEFINED_RELATIONSHIP) - return true; - else - return false; -} - -//! Relationship Class -class Relationship { - -public: - //! Constructor - /*! - * Create a new relationship - */ - Relationship(const std::string name, const std::string description) - : _name(name), _description(description) { - } - Relationship() - : _name(UNDEFINED_RELATIONSHIP) { - } - //! Destructor - virtual ~Relationship() { - } - //! Get Name for the relationship - std::string getName() { - return _name; - } - //! Get Description for the relationship - std::string getDescription() { - return _description; - } - //! Compare - bool operator < (const Relationship & right) const { - return _name < right._name; - } - //! Whether it is a undefined relationship - bool isRelationshipUndefined() - { - return isRelationshipNameUndefined(_name); - } - -protected: - - //! Name - std::string _name; - //! Description - std::string _description; - -private: -}; - -#endif http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b02af540/inc/RemoteProcessorGroupPort.h ---------------------------------------------------------------------- diff --git a/inc/RemoteProcessorGroupPort.h b/inc/RemoteProcessorGroupPort.h deleted file mode 100644 index cd99e50..0000000 --- a/inc/RemoteProcessorGroupPort.h +++ /dev/null @@ -1,96 +0,0 @@ -/** - * @file RemoteProcessorGroupPort.h - * RemoteProcessorGroupPort class declaration - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#ifndef __REMOTE_PROCESSOR_GROUP_PORT_H__ -#define __REMOTE_PROCESSOR_GROUP_PORT_H__ - -#include "FlowFileRecord.h" -#include "Processor.h" -#include "ProcessSession.h" -#include "Site2SiteClientProtocol.h" - -//! RemoteProcessorGroupPort Class -class RemoteProcessorGroupPort : public Processor -{ -public: - //! Constructor - /*! - * Create a new processor - */ - RemoteProcessorGroupPort(std::string name, uuid_t uuid = NULL) - : Processor(name, uuid) - { - _logger = Logger::getLogger(); - _peer = new Site2SitePeer("", 9999); - _protocol = new Site2SiteClientProtocol(_peer); - _protocol->setPortId(uuid); - } - //! Destructor - virtual ~RemoteProcessorGroupPort() - { - delete _protocol; - delete _peer; - } - //! Processor Name - static const std::string ProcessorName; - //! Supported Properties - static Property hostName; - static Property port; - //! Supported Relationships - static Relationship relation; -public: - //! OnTrigger method, implemented by NiFi RemoteProcessorGroupPort - virtual void onTrigger(ProcessContext *context, ProcessSession *session); - //! Initialize, over write by NiFi RemoteProcessorGroupPort - virtual void initialize(void); - //! Set Direction - void setDirection(TransferDirection direction) - { - _direction = direction; - if (_direction == RECEIVE) - this->setTriggerWhenEmpty(true); - } - //! Set Timeout - void setTimeOut(uint64_t timeout) - { - _protocol->setTimeOut(timeout); - } - //! SetTransmitting - void setTransmitting(bool val) - { - _transmitting = val; - } - -protected: - -private: - //! Logger - Logger *_logger; - //! Peer Connection - Site2SitePeer *_peer; - //! Peer Protocol - Site2SiteClientProtocol *_protocol; - //! Transaction Direction - TransferDirection _direction; - //! Transmitting - bool _transmitting; - -}; - -#endif http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b02af540/inc/ResourceClaim.h ---------------------------------------------------------------------- diff --git a/inc/ResourceClaim.h b/inc/ResourceClaim.h deleted file mode 100644 index d8f9979..0000000 --- a/inc/ResourceClaim.h +++ /dev/null @@ -1,92 +0,0 @@ -/** - * @file ResourceClaim.h - * Resource Claim class declaration - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#ifndef __RESOURCE_CLAIM_H__ -#define __RESOURCE_CLAIM_H__ - -#include <string> -#include <uuid/uuid.h> -#include <vector> -#include <queue> -#include <map> -#include <mutex> -#include <atomic> -#include "Configure.h" - -//! Default content directory -#define DEFAULT_CONTENT_DIRECTORY "." - -//! ResourceClaim Class -class ResourceClaim { - -public: - //! Constructor - /*! - * Create a new resource claim - */ - ResourceClaim(const std::string contentDirectory); - //! Destructor - virtual ~ResourceClaim() {} - //! increaseFlowFileRecordOwnedCount - void increaseFlowFileRecordOwnedCount() - { - ++_flowFileRecordOwnedCount; - } - //! decreaseFlowFileRecordOwenedCount - void decreaseFlowFileRecordOwnedCount() - { - --_flowFileRecordOwnedCount; - } - //! getFlowFileRecordOwenedCount - uint64_t getFlowFileRecordOwnedCount() - { - return _flowFileRecordOwnedCount; - } - //! Get the content full path - std::string getContentFullPath() - { - return _contentFullPath; - } - -protected: - //! A global unique identifier - uuid_t _uuid; - //! A local unique identifier - uint64_t _id; - //! Full path to the content - std::string _contentFullPath; - - //! How many FlowFileRecord Own this cliam - std::atomic<uint64_t> _flowFileRecordOwnedCount; - -private: - //! Configure - Configure *_configure; - //! Logger - Logger *_logger; - // Prevent default copy constructor and assignment operation - // Only support pass by reference or pointer - ResourceClaim(const ResourceClaim &parent); - ResourceClaim &operator=(const ResourceClaim &parent); - - //! Local resource claim number - static std::atomic<uint64_t> _localResourceClaimNumber; -}; - -#endif http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b02af540/inc/SchedulingAgent.h ---------------------------------------------------------------------- diff --git a/inc/SchedulingAgent.h b/inc/SchedulingAgent.h deleted file mode 100644 index 2e3f6b8..0000000 --- a/inc/SchedulingAgent.h +++ /dev/null @@ -1,98 +0,0 @@ -/** - * @file SchedulingAgent.h - * SchedulingAgent class declaration - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#ifndef __SCHEDULING_AGENT_H__ -#define __SCHEDULING_AGENT_H__ - -#include <uuid/uuid.h> -#include <vector> -#include <map> -#include <mutex> -#include <atomic> -#include <algorithm> -#include <thread> -#include "TimeUtil.h" -#include "Logger.h" -#include "Configure.h" -#include "FlowFileRecord.h" -#include "Processor.h" -#include "ProcessContext.h" - -//! SchedulingAgent Class -class SchedulingAgent -{ -public: - //! Constructor - /*! - * Create a new processor - */ - SchedulingAgent() { - _configure = Configure::getConfigure(); - _logger = Logger::getLogger(); - _running = false; - } - //! Destructor - virtual ~SchedulingAgent() - { - - } - //! onTrigger, return whether the yield is need - bool onTrigger(Processor *processor); - //! Whether agent has work to do - bool hasWorkToDo(Processor *processor); - //! Whether the outgoing need to be backpressure - bool hasTooMuchOutGoing(Processor *processor); - //! start - void start() { - _running = true; - } - //! stop - void stop() { - _running = false; - } - -public: - //! schedule, overwritten by different DrivenSchedulingAgent - virtual void schedule(Processor *processor) = 0; - //! unschedule, overwritten by different DrivenSchedulingAgent - virtual void unschedule(Processor *processor) = 0; - -protected: - //! Logger - Logger *_logger; - //! Configure - Configure *_configure; - //! Mutex for protection - std::mutex _mtx; - //! Whether it is running - std::atomic<bool> _running; - //! AdministrativeYieldDuration - int64_t _administrativeYieldDuration; - //! BoredYieldDuration - int64_t _boredYieldDuration; - -private: - // Prevent default copy constructor and assignment operation - // Only support pass by reference or pointer - SchedulingAgent(const SchedulingAgent &parent); - SchedulingAgent &operator=(const SchedulingAgent &parent); - -}; - -#endif http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b02af540/inc/Site2SiteClientProtocol.h ---------------------------------------------------------------------- diff --git a/inc/Site2SiteClientProtocol.h b/inc/Site2SiteClientProtocol.h deleted file mode 100644 index 5b72b11..0000000 --- a/inc/Site2SiteClientProtocol.h +++ /dev/null @@ -1,638 +0,0 @@ -/** - * @file Site2SiteClientProtocol.h - * Site2SiteClientProtocol class declaration - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#ifndef __SITE2SITE_CLIENT_PROTOCOL_H__ -#define __SITE2SITE_CLIENT_PROTOCOL_H__ - -#include <stdio.h> -#include <unistd.h> -#include <sys/types.h> -#include <sys/socket.h> -#include <netinet/in.h> -#include <arpa/inet.h> -#include <fcntl.h> -#include <netdb.h> -#include <string> -#include <errno.h> -#include <chrono> -#include <thread> -#include <algorithm> -#include <uuid/uuid.h> -#include "Logger.h" -#include "Configure.h" -#include "Property.h" -#include "Site2SitePeer.h" -#include "FlowFileRecord.h" -#include "ProcessContext.h" -#include "ProcessSession.h" - -//! Resource Negotiated Status Code -#define RESOURCE_OK 20 -#define DIFFERENT_RESOURCE_VERSION 21 -#define NEGOTIATED_ABORT 255 -// ! Max attributes -#define MAX_NUM_ATTRIBUTES 25000 - -/** - * An enumeration for specifying the direction in which data should be - * transferred between a client and a remote NiFi instance. - */ -typedef enum { - /** - * * The client is to send data to the remote instance. - * */ - SEND, - /** - * * The client is to receive data from the remote instance. - * */ - RECEIVE -} TransferDirection; - - -//! Peer State -typedef enum { - /** - * * IDLE - * */ - IDLE = 0, - /** - * * Socket Established - * */ - ESTABLISHED, - /** - * * HandShake Done - * */ - HANDSHAKED, - /** - * * After CodeDec Completion - * */ - READY -} PeerState; - -//! Transaction State -typedef enum { - /** - * * Transaction has been started but no data has been sent or received. - * */ - TRANSACTION_STARTED, - /** - * * Transaction has been started and data has been sent or received. - * */ - DATA_EXCHANGED, - /** - * * Data that has been transferred has been confirmed via its CRC. - * * Transaction is ready to be completed. - * */ - TRANSACTION_CONFIRMED, - /** - * * Transaction has been successfully completed. - * */ - TRANSACTION_COMPLETED, - /** - * * The Transaction has been canceled. - * */ - TRANSACTION_CANCELED, - /** - * * The Transaction ended in an error. - * */ - TRANSACTION_ERROR -} TransactionState; - -//! Request Type -typedef enum { - NEGOTIATE_FLOWFILE_CODEC = 0, - REQUEST_PEER_LIST, - SEND_FLOWFILES, - RECEIVE_FLOWFILES, - SHUTDOWN, - MAX_REQUEST_TYPE -} RequestType; - -//! Request Type Str -static const char *RequestTypeStr[MAX_REQUEST_TYPE] = -{ - "NEGOTIATE_FLOWFILE_CODEC", - "REQUEST_PEER_LIST", - "SEND_FLOWFILES", - "RECEIVE_FLOWFILES", - "SHUTDOWN" -}; - -//! Respond Code -typedef enum { - RESERVED = 0, - // ResponseCode, so that we can indicate a 0 followed by some other bytes - - // handshaking properties - PROPERTIES_OK = 1, - UNKNOWN_PROPERTY_NAME = 230, - ILLEGAL_PROPERTY_VALUE = 231, - MISSING_PROPERTY = 232, - // transaction indicators - CONTINUE_TRANSACTION = 10, - FINISH_TRANSACTION = 11, - CONFIRM_TRANSACTION = 12, // "Explanation" of this code is the checksum - TRANSACTION_FINISHED = 13, - TRANSACTION_FINISHED_BUT_DESTINATION_FULL = 14, - CANCEL_TRANSACTION = 15, - BAD_CHECKSUM = 19, - // data availability indicators - MORE_DATA = 20, - NO_MORE_DATA = 21, - // port state indicators - UNKNOWN_PORT = 200, - PORT_NOT_IN_VALID_STATE = 201, - PORTS_DESTINATION_FULL = 202, - // authorization - UNAUTHORIZED = 240, - // error indicators - ABORT = 250, - UNRECOGNIZED_RESPONSE_CODE = 254, - END_OF_STREAM = 255 -} RespondCode; - -//! Respond Code Class -typedef struct { - RespondCode code; - const char *description; - bool hasDescription; -} RespondCodeContext; - -//! Respond Code Context -static RespondCodeContext respondCodeContext[] = -{ - {RESERVED, "Reserved for Future Use", false}, - {PROPERTIES_OK, "Properties OK", false}, - {UNKNOWN_PROPERTY_NAME, "Unknown Property Name", true}, - {ILLEGAL_PROPERTY_VALUE, "Illegal Property Value", true}, - {MISSING_PROPERTY, "Missing Property", true}, - {CONTINUE_TRANSACTION, "Continue Transaction", false}, - {FINISH_TRANSACTION, "Finish Transaction", false}, - {CONFIRM_TRANSACTION, "Confirm Transaction", true}, - {TRANSACTION_FINISHED, "Transaction Finished", false}, - {TRANSACTION_FINISHED_BUT_DESTINATION_FULL, "Transaction Finished But Destination is Full", false}, - {CANCEL_TRANSACTION, "Cancel Transaction", true}, - {BAD_CHECKSUM, "Bad Checksum", false}, - {MORE_DATA, "More Data Exists", false}, - {NO_MORE_DATA, "No More Data Exists", false}, - {UNKNOWN_PORT, "Unknown Port", false}, - {PORT_NOT_IN_VALID_STATE, "Port Not in a Valid State", true}, - {PORTS_DESTINATION_FULL, "Port's Destination is Full", false}, - {UNAUTHORIZED, "User Not Authorized", true}, - {ABORT, "Abort", true}, - {UNRECOGNIZED_RESPONSE_CODE, "Unrecognized Response Code", false}, - {END_OF_STREAM, "End of Stream", false} -}; - -//! Respond Code Sequence Pattern -static const uint8_t CODE_SEQUENCE_VALUE_1 = (uint8_t) 'R'; -static const uint8_t CODE_SEQUENCE_VALUE_2 = (uint8_t) 'C'; - -/** - * Enumeration of Properties that can be used for the Site-to-Site Socket - * Protocol. - */ -typedef enum { - /** - * Boolean value indicating whether or not the contents of a FlowFile should - * be GZipped when transferred. - */ - GZIP, - /** - * The unique identifier of the port to communicate with - */ - PORT_IDENTIFIER, - /** - * Indicates the number of milliseconds after the request was made that the - * client will wait for a response. If no response has been received by the - * time this value expires, the server can move on without attempting to - * service the request because the client will have already disconnected. - */ - REQUEST_EXPIRATION_MILLIS, - /** - * The preferred number of FlowFiles that the server should send to the - * client when pulling data. This property was introduced in version 5 of - * the protocol. - */ - BATCH_COUNT, - /** - * The preferred number of bytes that the server should send to the client - * when pulling data. This property was introduced in version 5 of the - * protocol. - */ - BATCH_SIZE, - /** - * The preferred amount of time that the server should send data to the - * client when pulling data. This property was introduced in version 5 of - * the protocol. Value is in milliseconds. - */ - BATCH_DURATION, - MAX_HANDSHAKE_PROPERTY -} HandshakeProperty; - -//! HandShakeProperty Str -static const char *HandShakePropertyStr[MAX_HANDSHAKE_PROPERTY] = -{ - /** - * Boolean value indicating whether or not the contents of a FlowFile should - * be GZipped when transferred. - */ - "GZIP", - /** - * The unique identifier of the port to communicate with - */ - "PORT_IDENTIFIER", - /** - * Indicates the number of milliseconds after the request was made that the - * client will wait for a response. If no response has been received by the - * time this value expires, the server can move on without attempting to - * service the request because the client will have already disconnected. - */ - "REQUEST_EXPIRATION_MILLIS", - /** - * The preferred number of FlowFiles that the server should send to the - * client when pulling data. This property was introduced in version 5 of - * the protocol. - */ - "BATCH_COUNT", - /** - * The preferred number of bytes that the server should send to the client - * when pulling data. This property was introduced in version 5 of the - * protocol. - */ - "BATCH_SIZE", - /** - * The preferred amount of time that the server should send data to the - * client when pulling data. This property was introduced in version 5 of - * the protocol. Value is in milliseconds. - */ - "BATCH_DURATION" -}; - -class Site2SiteClientProtocol; - -//! Transaction Class -class Transaction -{ - friend class Site2SiteClientProtocol; -public: - //! Constructor - /*! - * Create a new transaction - */ - Transaction(TransferDirection direction) { - _state = TRANSACTION_STARTED; - _direction = direction; - _dataAvailable = false; - _transfers = 0; - _bytes = 0; - - char uuidStr[37]; - - // Generate the global UUID for the transaction - uuid_generate(_uuid); - uuid_unparse(_uuid, uuidStr); - _uuidStr = uuidStr; - } - //! Destructor - virtual ~Transaction() - { - } - //! getUUIDStr - std::string getUUIDStr() - { - return _uuidStr; - } - //! getState - TransactionState getState() - { - return _state; - } - //! isDataAvailable - bool isDataAvailable() - { - return _dataAvailable; - } - //! setDataAvailable() - void setDataAvailable(bool value) - { - _dataAvailable = value; - } - //! getDirection - TransferDirection getDirection() - { - return _direction; - } - //! getCRC - long getCRC() - { - return _crc.getCRC(); - } - //! updateCRC - void updateCRC(uint8_t *buffer, uint32_t length) - { - _crc.update(buffer, length); - } - -protected: - -private: - //! Transaction State - TransactionState _state; - //! Transaction Direction - TransferDirection _direction; - //! Whether received data is available - bool _dataAvailable; - //! A global unique identifier - uuid_t _uuid; - //! UUID string - std::string _uuidStr; - //! Number of transfer - int _transfers; - //! Number of content bytes - uint64_t _bytes; - //! CRC32 - CRC32 _crc; - - // Prevent default copy constructor and assignment operation - // Only support pass by reference or pointer - Transaction(const Transaction &parent); - Transaction &operator=(const Transaction &parent); -}; - -/** - * Represents a piece of data that is to be sent to or that was received from a - * NiFi instance. - */ -class DataPacket -{ -public: - DataPacket(Site2SiteClientProtocol *protocol, Transaction *transaction, - std::map<std::string, std::string> attributes) { - _protocol = protocol; - _size = 0; - _transaction = transaction; - _attributes = attributes; - } - std::map<std::string, std::string> _attributes; - uint64_t _size; - Site2SiteClientProtocol *_protocol; - Transaction *_transaction; -}; - -//! Site2SiteClientProtocol Class -class Site2SiteClientProtocol -{ -public: - //! Constructor - /*! - * Create a new control protocol - */ - Site2SiteClientProtocol(Site2SitePeer *peer) { - _logger = Logger::getLogger(); - _configure = Configure::getConfigure(); - _peer = peer; - _batchSize = 0; - _batchCount = 0; - _batchDuration = 0; - _batchSendNanos = 5000000000; // 5 seconds - _timeOut = 30000; // 30 seconds - _peerState = IDLE; - _supportedVersion[0] = 5; - _supportedVersion[1] = 4; - _supportedVersion[2] = 3; - _supportedVersion[3] = 2; - _supportedVersion[4] = 1; - _currentVersion = _supportedVersion[0]; - _currentVersionIndex = 0; - _supportedCodecVersion[0] = 1; - _currentCodecVersion = _supportedCodecVersion[0]; - _currentCodecVersionIndex = 0; - } - //! Destructor - virtual ~Site2SiteClientProtocol() - { - } - -public: - //! setBatchSize - void setBatchSize(uint64_t size) - { - _batchSize = size; - } - //! setBatchCount - void setBatchCount(uint64_t count) - { - _batchCount = count; - } - //! setBatchDuration - void setBatchDuration(uint64_t duration) - { - _batchDuration = duration; - } - //! setTimeOut - void setTimeOut(uint64_t time) - { - _timeOut = time; - if (_peer) - _peer->setTimeOut(time); - - } - //! getTimeout - uint64_t getTimeOut() - { - return _timeOut; - } - //! setPortId - void setPortId(uuid_t id) - { - uuid_copy(_portId, id); - char idStr[37]; - uuid_unparse(id, idStr); - _portIdStr = idStr; - } - //! getResourceName - std::string getResourceName() - { - return "SocketFlowFileProtocol"; - } - //! getCodecResourceName - std::string getCodecResourceName() - { - return "StandardFlowFileCodec"; - } - //! bootstrap the protocol to the ready for transaction state by going through the state machine - bool bootstrap(); - //! establish - bool establish(); - //! handShake - bool handShake(); - //! negotiateCodec - bool negotiateCodec(); - //! initiateResourceNegotiation - bool initiateResourceNegotiation(); - //! initiateCodecResourceNegotiation - bool initiateCodecResourceNegotiation(); - //! tearDown - void tearDown(); - //! write Request Type - int writeRequestType(RequestType type); - //! read Request Type - int readRequestType(RequestType &type); - //! read Respond - int readRespond(RespondCode &code, std::string &message); - //! write respond - int writeRespond(RespondCode code, std::string message); - //! getRespondCodeContext - RespondCodeContext *getRespondCodeContext(RespondCode code) - { - for (unsigned int i = 0; i < sizeof(respondCodeContext)/sizeof(RespondCodeContext); i++) - { - if (respondCodeContext[i].code == code) - { - return &respondCodeContext[i]; - } - } - return NULL; - } - //! getPeer - Site2SitePeer *getPeer() - { - return _peer; - } - //! Creation of a new transaction, return the transaction ID if success, - //! Return NULL when any error occurs - Transaction *createTransaction(std::string &transactionID, TransferDirection direction); - //! Receive the data packet from the transaction - //! Return false when any error occurs - bool receive(std::string transactionID, DataPacket *packet, bool &eof); - //! Send the data packet from the transaction - //! Return false when any error occurs - bool send(std::string transactionID, DataPacket *packet, FlowFileRecord *flowFile, ProcessSession *session); - //! Confirm the data that was sent or received by comparing CRC32's of the data sent and the data received. - bool confirm(std::string transactionID); - //! Cancel the transaction - void cancel(std::string transactionID); - //! Complete the transaction - bool complete(std::string transactionID); - //! Error the transaction - void error(std::string transactionID); - //! Receive flow files for the process session - void receiveFlowFiles(ProcessContext *context, ProcessSession *session); - //! Transfer flow files for the process session - void transferFlowFiles(ProcessContext *context, ProcessSession *session); - //! deleteTransaction - void deleteTransaction(std::string transactionID); - //! Nest Callback Class for write stream - class WriteCallback : public OutputStreamCallback - { - public: - WriteCallback(DataPacket *packet) - : _packet(packet) {} - DataPacket *_packet; - void process(std::ofstream *stream) { - uint8_t buffer[8192]; - int len = _packet->_size; - while (len > 0) - { - int size = std::min(len, (int) sizeof(buffer)); - int ret = _packet->_protocol->_peer->readData(buffer, size, &_packet->_transaction->_crc); - if (ret != size) - { - _packet->_protocol->_logger->log_error("Site2Site Receive Flow Size %d Failed %d", size, ret); - break; - } - stream->write((const char *) buffer, size); - len -= size; - } - } - }; - //! Nest Callback Class for read stream - class ReadCallback : public InputStreamCallback - { - public: - ReadCallback(DataPacket *packet) - : _packet(packet) {} - DataPacket *_packet; - void process(std::ifstream *stream) { - _packet->_size = 0; - uint8_t buffer[8192]; - int readSize; - while (stream->good()) - { - if (!stream->read((char *)buffer, 8192)) - readSize = stream->gcount(); - else - readSize = 8192; - int ret = _packet->_protocol->_peer->write(buffer, readSize, &_packet->_transaction->_crc); - if (ret != readSize) - { - _packet->_protocol->_logger->log_error("Site2Site Send Flow Size %d Failed %d", readSize, ret); - break; - } - _packet->_size += readSize; - } - } - }; - -protected: - -private: - //! Mutex for protection - std::mutex _mtx; - //! Logger - Logger *_logger; - //! Configure - Configure *_configure; - //! Batch Count - std::atomic<uint64_t> _batchCount; - //! Batch Size - std::atomic<uint64_t> _batchSize; - //! Batch Duration in msec - std::atomic<uint64_t> _batchDuration; - //! Timeout in msec - std::atomic<uint64_t> _timeOut; - //! Peer Connection - Site2SitePeer *_peer; - //! portId - uuid_t _portId; - //! portIDStr - std::string _portIdStr; - //! BATCH_SEND_NANOS - uint64_t _batchSendNanos; - //! Peer State - PeerState _peerState; - uint32_t _supportedVersion[5]; - uint32_t _currentVersion; - int _currentVersionIndex; - uint32_t _supportedCodecVersion[1]; - uint32_t _currentCodecVersion; - int _currentCodecVersionIndex; - //! commsIdentifier - std::string _commsIdentifier; - //! transaction map - std::map<std::string, Transaction *> _transactionMap; - - // Prevent default copy constructor and assignment operation - // Only support pass by reference or pointer - Site2SiteClientProtocol(const Site2SiteClientProtocol &parent); - Site2SiteClientProtocol &operator=(const Site2SiteClientProtocol &parent); -}; - -#endif http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b02af540/inc/Site2SitePeer.h ---------------------------------------------------------------------- diff --git a/inc/Site2SitePeer.h b/inc/Site2SitePeer.h deleted file mode 100644 index ff11637..0000000 --- a/inc/Site2SitePeer.h +++ /dev/null @@ -1,364 +0,0 @@ -/** - * @file Site2SitePeer.h - * Site2SitePeer class declaration for site to site peer - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#ifndef __SITE2SITE_PEER_H__ -#define __SITE2SITE_PEER_H__ - -#include <stdio.h> -#include <unistd.h> -#include <sys/types.h> -#include <sys/socket.h> -#include <netinet/in.h> -#include <arpa/inet.h> -#include <fcntl.h> -#include <netdb.h> -#include <string> -#include <errno.h> -#include <mutex> -#include <atomic> -#include "TimeUtil.h" -#include "Logger.h" -#include "Configure.h" -#include "Property.h" - -class CRC32 -{ -public: - CRC32() { - crc = 0; - - if (tableInit) - return; - - tableInit = true; - unsigned int poly = 0xedb88320; - unsigned int temp = 0; - for(unsigned int i = 0; i < 256; ++i) { - temp = i; - for(int j = 8; j > 0; --j) { - if((temp & 1) == 1) { - temp = (unsigned int)((temp >> 1) ^ poly); - }else { - temp >>= 1; - } - } - table[i] = temp; - } - } - - unsigned int update(uint8_t * bytes, size_t size) { - crc = crc ^ ~0U; - for(unsigned int i = 0; i < size; ++i) { - uint8_t index = (uint8_t)(((crc) & 0xff) ^ bytes[i]); - crc = (unsigned int)((crc >> 8) ^ table[index]); - } - crc = crc ^ ~0U; - return crc; - } - - long getCRC() - { - return crc; - } - -private: - static unsigned int table[256]; - static std::atomic<bool> tableInit; - unsigned int crc; -}; - -static const char MAGIC_BYTES[] = {'N', 'i', 'F', 'i'}; - -//! Site2SitePeer Class -class Site2SitePeer -{ -public: - //! Constructor - /*! - * Create a new site2site peer - */ - Site2SitePeer(std::string host, uint16_t port) { - _logger = Logger::getLogger(); - _configure = Configure::getConfigure(); - _socket = 0; - _host = host; - _port = port; - _yieldExpiration = 0; - _timeOut = 30000; // 30 seconds - _url = "nifi://" + _host + ":" + std::to_string(_port); - } - //! Destructor - virtual ~Site2SitePeer() { Close();} - //! Set Processor yield period in MilliSecond - void setYieldPeriodMsec(uint64_t period) { - _yieldPeriodMsec = period; - } - //! get URL - std::string getURL() { - return _url; - } - //! Get Processor yield period in MilliSecond - uint64_t getYieldPeriodMsec(void) { - return(_yieldPeriodMsec); - } - //! Yield based on the yield period - void yield() - { - _yieldExpiration = (getTimeMillis() + _yieldPeriodMsec); - } - //! setHostName - void setHostName(std::string host) - { - _host = host; - _url = "nifi://" + _host + ":" + std::to_string(_port); - } - //! setPort - void setPort(uint16_t port) - { - _port = port; - _url = "nifi://" + _host + ":" + std::to_string(_port); - } - //! getHostName - std::string getHostName() - { - return _host; - } - //! getPort - uint16_t getPort() - { - return _port; - } - //! Yield based on the input time - void yield(uint64_t time) - { - _yieldExpiration = (getTimeMillis() + time); - } - //! whether need be to yield - bool isYield() - { - if (_yieldExpiration > 0) - return (_yieldExpiration >= getTimeMillis()); - else - return false; - } - // clear yield expiration - void clearYield() - { - _yieldExpiration = 0; - } - //! Yield based on the yield period - void yield(std::string portId) - { - std::lock_guard<std::mutex> lock(_mtx); - uint64_t yieldExpiration = (getTimeMillis() + _yieldPeriodMsec); - _yieldExpirationPortIdMap[portId] = yieldExpiration; - } - //! Yield based on the input time - void yield(std::string portId, uint64_t time) - { - std::lock_guard<std::mutex> lock(_mtx); - uint64_t yieldExpiration = (getTimeMillis() + time); - _yieldExpirationPortIdMap[portId] = yieldExpiration; - } - //! whether need be to yield - bool isYield(std::string portId) - { - std::lock_guard<std::mutex> lock(_mtx); - std::map<std::string, uint64_t>::iterator it = this->_yieldExpirationPortIdMap.find(portId); - if (it != _yieldExpirationPortIdMap.end()) - { - uint64_t yieldExpiration = it->second; - return (yieldExpiration >= getTimeMillis()); - } - else - { - return false; - } - } - //! clear yield expiration - void clearYield(std::string portId) - { - std::lock_guard<std::mutex> lock(_mtx); - std::map<std::string, uint64_t>::iterator it = this->_yieldExpirationPortIdMap.find(portId); - if (it != _yieldExpirationPortIdMap.end()) - { - _yieldExpirationPortIdMap.erase(portId); - } - } - //! setTimeOut - void setTimeOut(uint64_t time) - { - _timeOut = time; - } - //! getTimeOut - uint64_t getTimeOut() - { - return _timeOut; - } - int write(uint8_t value, CRC32 *crc = NULL) - { - return sendData(&value, 1, crc); - } - int write(char value, CRC32 *crc = NULL) - { - return sendData((uint8_t *)&value, 1, crc); - } - int write(uint32_t value, CRC32 *crc = NULL) - { - uint8_t temp[4]; - - temp[0] = (value & 0xFF000000) >> 24; - temp[1] = (value & 0x00FF0000) >> 16; - temp[2] = (value & 0x0000FF00) >> 8; - temp[3] = (value & 0x000000FF); - return sendData(temp, 4, crc); - } - int write(uint16_t value, CRC32 *crc = NULL) - { - uint8_t temp[2]; - temp[0] = (value & 0xFF00) >> 8; - temp[1] = (value & 0xFF); - return sendData(temp, 2, crc); - } - int write(uint8_t *value, int len, CRC32 *crc = NULL) - { - return sendData(value, len, crc); - } - int write(uint64_t value, CRC32 *crc = NULL) - { - uint8_t temp[8]; - - temp[0] = (value >> 56) & 0xFF; - temp[1] = (value >> 48) & 0xFF; - temp[2] = (value >> 40) & 0xFF; - temp[3] = (value >> 32) & 0xFF; - temp[4] = (value >> 24) & 0xFF; - temp[5] = (value >> 16) & 0xFF; - temp[6] = (value >> 8) & 0xFF; - temp[7] = (value >> 0) & 0xFF; - return sendData(temp, 8, crc); - } - int write(bool value, CRC32 *crc = NULL) - { - uint8_t temp = value; - return write(temp, crc); - } - int writeUTF(std::string str, bool widen = false, CRC32 *crc = NULL); - int read(uint8_t &value, CRC32 *crc = NULL) - { - uint8_t buf; - - int ret = readData(&buf, 1, crc); - if (ret == 1) - value = buf; - return ret; - } - int read(uint16_t &value, CRC32 *crc = NULL) - { - uint8_t buf[2]; - - int ret = readData(buf, 2, crc); - if (ret == 2) - value = (buf[0] << 8) | buf[1]; - return ret; - } - int read(char &value, CRC32 *crc = NULL) - { - uint8_t buf; - - int ret = readData(&buf, 1, crc); - if (ret == 1) - value = (char) buf; - return ret; - } - int read(uint8_t *value, int len, CRC32 *crc = NULL) - { - return readData(value, len, crc); - } - int read(uint32_t &value, CRC32 *crc = NULL) - { - uint8_t buf[4]; - - int ret = readData(buf, 4, crc); - if (ret == 4) - value = (buf[0] << 24) | (buf[1] << 16) | (buf[2] << 8) | buf[3]; - return ret; - } - int read(uint64_t &value, CRC32 *crc = NULL) - { - uint8_t buf[8]; - - int ret = readData(buf, 8, crc); - if (ret == 8) - { - value = ((uint64_t) buf[0] << 56) | - ((uint64_t) (buf[1] & 255) << 48) | - ((uint64_t) (buf[2] & 255) << 40) | - ((uint64_t) (buf[3] & 255) << 32) | - ((uint64_t) (buf[4] & 255) << 24) | - ((uint64_t) (buf[5] & 255) << 16) | - ((uint64_t) (buf[6] & 255) << 8) | - ((uint64_t) (buf[7] & 255) << 0); - } - return ret; - } - int readUTF(std::string &str, bool widen = false, CRC32 *crc = NULL); - //! open connection to the peer - bool Open(); - //! close connection to the peer - void Close(); - //! Send Data via the socket, return -1 for failure - int sendData(uint8_t *buf, int buflen, CRC32 *crc = NULL); - //! Read length into buf, return -1 for failure and 0 for EOF - int readData(uint8_t *buf, int buflen, CRC32 *crc = NULL); - //! Select on the socket - int Select(int msec); - -protected: - -private: - //! Mutex for protection - std::mutex _mtx; - //! S2S server Name - std::string _host; - //! S2S server port - uint16_t _port; - //! socket to server - int _socket; - //! URL - std::string _url; - //! socket timeout; - std::atomic<uint64_t> _timeOut; - //! Logger - Logger *_logger; - //! Configure - Configure *_configure; - //! Yield Period in Milliseconds - std::atomic<uint64_t> _yieldPeriodMsec; - //! Yield Expiration - std::atomic<uint64_t> _yieldExpiration; - //! Yield Expiration per destination PortID - std::map<std::string, uint64_t> _yieldExpirationPortIdMap; - // Prevent default copy constructor and assignment operation - // Only support pass by reference or pointer - Site2SitePeer(const Site2SitePeer &parent); - Site2SitePeer &operator=(const Site2SitePeer &parent); -}; - -#endif http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b02af540/inc/TailFile.h ---------------------------------------------------------------------- diff --git a/inc/TailFile.h b/inc/TailFile.h deleted file mode 100644 index 5c4ba09..0000000 --- a/inc/TailFile.h +++ /dev/null @@ -1,93 +0,0 @@ -/** - * @file TailFile.h - * TailFile class declaration - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#ifndef __TAIL_FILE_H__ -#define __TAIL_FILE_H__ - -#include "FlowFileRecord.h" -#include "Processor.h" -#include "ProcessSession.h" - -//! TailFile Class -class TailFile : public Processor -{ -public: - //! Constructor - /*! - * Create a new processor - */ - TailFile(std::string name, uuid_t uuid = NULL) - : Processor(name, uuid) - { - _logger = Logger::getLogger(); - _stateRecovered = false; - } - //! Destructor - virtual ~TailFile() - { - storeState(); - } - //! Processor Name - static const std::string ProcessorName; - //! Supported Properties - static Property FileName; - static Property StateFile; - //! Supported Relationships - static Relationship Success; - -public: - //! OnTrigger method, implemented by NiFi TailFile - virtual void onTrigger(ProcessContext *context, ProcessSession *session); - //! Initialize, over write by NiFi TailFile - virtual void initialize(void); - //! recoverState - void recoverState(); - //! storeState - void storeState(); - -protected: - -private: - //! Logger - Logger *_logger; - std::string _fileLocation; - //! Property Specified Tailed File Name - std::string _fileName; - //! File to save state - std::string _stateFile; - //! State related to the tailed file - std::string _currentTailFileName; - uint64_t _currentTailFilePosition; - bool _stateRecovered; - uint64_t _currentTailFileCreatedTime; - //! Utils functions for parse state file - std::string trimLeft(const std::string& s); - std::string trimRight(const std::string& s); - void parseStateFileLine(char *buf); - void checkRollOver(); - -}; - -//! Matched File Item for Roll over check -typedef struct { - std::string fileName; - uint64_t modifiedTime; -} TailMatchedFileItem; - -#endif http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b02af540/inc/TimeUtil.h ---------------------------------------------------------------------- diff --git a/inc/TimeUtil.h b/inc/TimeUtil.h deleted file mode 100644 index b024245..0000000 --- a/inc/TimeUtil.h +++ /dev/null @@ -1,82 +0,0 @@ -/** - * @file TimeUtil.h - * Basic Time Utility - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#ifndef __TIME_UTIL_H__ -#define __TIME_UTIL_H__ - -#include <time.h> -#include <sys/time.h> -#include <string.h> -#include <unistd.h> -#include <string.h> -#include <iostream> - -#ifdef __MACH__ -#include <mach/clock.h> -#include <mach/mach.h> -#endif - -inline uint64_t getTimeMillis() -{ - uint64_t value; - - timeval time; - gettimeofday(&time, NULL); - value = ((uint64_t) (time.tv_sec) * 1000) + (time.tv_usec / 1000); - - return value; -} - -inline uint64_t getTimeNano() -{ - struct timespec ts; - -#ifdef __MACH__ // OS X does not have clock_gettime, use clock_get_time - clock_serv_t cclock; - mach_timespec_t mts; - host_get_clock_service(mach_host_self(), CALENDAR_CLOCK, &cclock); - clock_get_time(cclock, &mts); - mach_port_deallocate(mach_task_self(), cclock); - ts.tv_sec = mts.tv_sec; - ts.tv_nsec = mts.tv_nsec; -#else - clock_gettime(CLOCK_REALTIME, &ts); -#endif - - return ((uint64_t) (ts.tv_sec) * 1000000000 + ts.tv_nsec); -} - -//! Convert millisecond since UTC to a time display string -inline std::string getTimeStr(uint64_t msec) -{ - char date[120]; - time_t second = (time_t) (msec/1000); - msec = msec % 1000; - strftime(date, sizeof(date) / sizeof(*date), "%Y-%m-%d %H:%M:%S", - localtime(&second)); - - std::string ret = date; - date[0] = '\0'; - sprintf(date, ".%03llu", (unsigned long long) msec); - - ret += date; - return ret; -} - -#endif http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b02af540/inc/TimerDrivenSchedulingAgent.h ---------------------------------------------------------------------- diff --git a/inc/TimerDrivenSchedulingAgent.h b/inc/TimerDrivenSchedulingAgent.h deleted file mode 100644 index 9195745..0000000 --- a/inc/TimerDrivenSchedulingAgent.h +++ /dev/null @@ -1,66 +0,0 @@ -/** - * @file TimerDrivenSchedulingAgent.h - * TimerDrivenSchedulingAgent class declaration - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#ifndef __TIMER_DRIVEN_SCHEDULING_AGENT_H__ -#define __TIMER_DRIVEN_SCHEDULING_AGENT_H__ - -#include "Logger.h" -#include "Configure.h" -#include "Processor.h" -#include "ProcessContext.h" -#include "SchedulingAgent.h" - -//! TimerDrivenSchedulingAgent Class -class TimerDrivenSchedulingAgent : public SchedulingAgent -{ -public: - //! Constructor - /*! - * Create a new processor - */ - TimerDrivenSchedulingAgent() - : SchedulingAgent() - { - } - //! Destructor - virtual ~TimerDrivenSchedulingAgent() - { - } - //! Run function for the thread - static void run(TimerDrivenSchedulingAgent *agent, Processor *processor); - -public: - //! schedule, overwritten by different DrivenTimerDrivenSchedulingAgent - virtual void schedule(Processor *processor); - //! unschedule, overwritten by different DrivenTimerDrivenSchedulingAgent - virtual void unschedule(Processor *processor); - -protected: - -private: - //! Threads - std::map<std::string, std::vector<std::thread *>> _threads; - // Prevent default copy constructor and assignment operation - // Only support pass by reference or pointer - TimerDrivenSchedulingAgent(const TimerDrivenSchedulingAgent &parent); - TimerDrivenSchedulingAgent &operator=(const TimerDrivenSchedulingAgent &parent); - -}; - -#endif http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b02af540/inc/spdlog/async_logger.h ---------------------------------------------------------------------- diff --git a/inc/spdlog/async_logger.h b/inc/spdlog/async_logger.h deleted file mode 100644 index 517ce92..0000000 --- a/inc/spdlog/async_logger.h +++ /dev/null @@ -1,90 +0,0 @@ -/*************************************************************************/ -/* spdlog - an extremely fast and easy to use c++11 logging library. */ -/* Copyright (c) 2014 Gabi Melman. */ -/* */ -/* Permission is hereby granted, free of charge, to any person obtaining */ -/* a copy of this software and associated documentation files (the */ -/* "Software"), to deal in the Software without restriction, including */ -/* without limitation the rights to use, copy, modify, merge, publish, */ -/* distribute, sublicense, and/or sell copies of the Software, and to */ -/* permit persons to whom the Software is furnished to do so, subject to */ -/* the following conditions: */ -/* */ -/* The above copyright notice and this permission notice shall be */ -/* included in all copies or substantial portions of the Software. */ -/* */ -/* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, */ -/* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF */ -/* MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.*/ -/* IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY */ -/* CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, */ -/* TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE */ -/* SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. */ -/*************************************************************************/ - -#pragma once - -// Very fast asynchronous logger (millions of logs per second on an average desktop) -// Uses pre allocated lockfree queue for maximum throughput even under large number of threads. -// Creates a single back thread to pop messages from the queue and log them. -// -// Upon each log write the logger: -// 1. Checks if its log level is enough to log the message -// 2. Push a new copy of the message to a queue (or block the caller until space is available in the queue) -// 3. will throw spdlog_ex upon log exceptions -// Upong destruction, logs all remaining messages in the queue before destructing.. - -#include <chrono> -#include <functional> -#include "common.h" -#include "logger.h" -#include "spdlog.h" - - -namespace spdlog -{ - -namespace details -{ -class async_log_helper; -} - -class async_logger :public logger -{ -public: - template<class It> - async_logger(const std::string& name, - const It& begin, - const It& end, - size_t queue_size, - const async_overflow_policy overflow_policy = async_overflow_policy::block_retry, - const std::function<void()>& worker_warmup_cb = nullptr, - const std::chrono::milliseconds& flush_interval_ms = std::chrono::milliseconds::zero()); - - async_logger(const std::string& logger_name, - sinks_init_list sinks, - size_t queue_size, - const async_overflow_policy overflow_policy = async_overflow_policy::block_retry, - const std::function<void()>& worker_warmup_cb = nullptr, - const std::chrono::milliseconds& flush_interval_ms = std::chrono::milliseconds::zero()); - - async_logger(const std::string& logger_name, - sink_ptr single_sink, - size_t queue_size, - const async_overflow_policy overflow_policy = async_overflow_policy::block_retry, - const std::function<void()>& worker_warmup_cb = nullptr, - const std::chrono::milliseconds& flush_interval_ms = std::chrono::milliseconds::zero()); - - -protected: - void _log_msg(details::log_msg& msg) override; - void _set_formatter(spdlog::formatter_ptr msg_formatter) override; - void _set_pattern(const std::string& pattern) override; - -private: - std::unique_ptr<details::async_log_helper> _async_log_helper; -}; -} - - -#include "./details/async_logger_impl.h" http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b02af540/inc/spdlog/common.h ---------------------------------------------------------------------- diff --git a/inc/spdlog/common.h b/inc/spdlog/common.h deleted file mode 100644 index cde5a9e..0000000 --- a/inc/spdlog/common.h +++ /dev/null @@ -1,116 +0,0 @@ -/*************************************************************************/ -/* spdlog - an extremely fast and easy to use c++11 logging library. */ -/* Copyright (c) 2014 Gabi Melman. */ -/* */ -/* Permission is hereby granted, free of charge, to any person obtaining */ -/* a copy of this software and associated documentation files (the */ -/* "Software"), to deal in the Software without restriction, including */ -/* without limitation the rights to use, copy, modify, merge, publish, */ -/* distribute, sublicense, and/or sell copies of the Software, and to */ -/* permit persons to whom the Software is furnished to do so, subject to */ -/* the following conditions: */ -/* */ -/* The above copyright notice and this permission notice shall be */ -/* included in all copies or substantial portions of the Software. */ -/* */ -/* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, */ -/* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF */ -/* MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.*/ -/* IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY */ -/* CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, */ -/* TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE */ -/* SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. */ -/*************************************************************************/ - -#pragma once - -#include <string> -#include <initializer_list> -#include <chrono> -#include <memory> - -//visual studio does not support noexcept yet -#ifndef _MSC_VER -#define SPDLOG_NOEXCEPT noexcept -#else -#define SPDLOG_NOEXCEPT throw() -#endif - - -namespace spdlog -{ - -class formatter; - -namespace sinks -{ -class sink; -} - -// Common types across the lib -using log_clock = std::chrono::system_clock; -using sink_ptr = std::shared_ptr < sinks::sink > ; -using sinks_init_list = std::initializer_list < sink_ptr > ; -using formatter_ptr = std::shared_ptr<spdlog::formatter>; - - -//Log level enum -namespace level -{ -typedef enum -{ - trace = 0, - debug = 1, - info = 2, - notice = 3, - warn = 4, - err = 5, - critical = 6, - alert = 7, - emerg = 8, - off = 9 -} level_enum; - -static const char* level_names[] { "trace", "debug", "info", "notice", "warning", "error", "critical", "alert", "emerg", "off"}; - -static const char* short_level_names[] { "T", "D", "I", "N", "W", "E", "C", "A", "M", "O"}; - -inline const char* to_str(spdlog::level::level_enum l) -{ - return level_names[l]; -} - -inline const char* to_short_str(spdlog::level::level_enum l) -{ - return short_level_names[l]; -} -} //level - - -// -// Async overflow policy - block by default. -// -enum class async_overflow_policy -{ - block_retry, // Block / yield / sleep until message can be enqueued - discard_log_msg // Discard the message it enqueue fails -}; - - -// -// Log exception -// -class spdlog_ex : public std::exception -{ -public: - spdlog_ex(const std::string& msg) :_msg(msg) {} - const char* what() const SPDLOG_NOEXCEPT override - { - return _msg.c_str(); - } -private: - std::string _msg; - -}; - -} //spdlog