MINIFI-34 Establishing CMake build system to provide build functionality equivalent to pre-existing Makefile.
Updating .travis.yml to support CMake build system. Project: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/commit/b02af540 Tree: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/tree/b02af540 Diff: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/diff/b02af540 Branch: refs/heads/master Commit: b02af540c4bc664be77174b32c4350dd7971b13d Parents: 8beb377 Author: Aldrin Piri <ald...@apache.org> Authored: Thu May 5 17:32:57 2016 -0400 Committer: Aldrin Piri <ald...@apache.org> Committed: Thu Oct 13 11:06:55 2016 -0400 ---------------------------------------------------------------------- .gitignore | 7 + .travis.yml | 2 +- CMakeLists.txt | 74 + Makefile | 109 - README.md | 90 +- inc/Configure.h | 115 - inc/Connection.h | 201 -- inc/Exception.h | 95 - inc/FlowControlProtocol.h | 339 -- inc/FlowController.h | 248 -- inc/FlowFileRecord.h | 220 -- inc/GenerateFlowFile.h | 87 - inc/GetFile.h | 117 - inc/ListenSyslog.h | 209 -- inc/LogAttribute.h | 128 - inc/Logger.h | 154 - inc/ProcessContext.h | 99 - inc/ProcessGroup.h | 182 - inc/ProcessSession.h | 116 - inc/Processor.h | 346 -- inc/Property.h | 344 -- inc/RealTimeDataCollector.h | 131 - inc/Relationship.h | 87 - inc/RemoteProcessorGroupPort.h | 96 - inc/ResourceClaim.h | 92 - inc/SchedulingAgent.h | 98 - inc/Site2SiteClientProtocol.h | 638 ---- inc/Site2SitePeer.h | 364 -- inc/TailFile.h | 93 - inc/TimeUtil.h | 82 - inc/TimerDrivenSchedulingAgent.h | 66 - inc/spdlog/async_logger.h | 90 - inc/spdlog/common.h | 116 - inc/spdlog/details/async_log_helper.h | 326 -- inc/spdlog/details/async_logger_impl.h | 82 - inc/spdlog/details/file_helper.h | 144 - inc/spdlog/details/format.cc | 1353 -------- inc/spdlog/details/format.h | 3155 ------------------ inc/spdlog/details/line_logger.h | 221 -- inc/spdlog/details/log_msg.h | 98 - inc/spdlog/details/logger_impl.h | 320 -- inc/spdlog/details/mpmc_bounded_q.h | 175 - inc/spdlog/details/null_mutex.h | 43 - inc/spdlog/details/os.h | 198 -- inc/spdlog/details/pattern_formatter_impl.h | 628 ---- inc/spdlog/details/registry.h | 180 - inc/spdlog/details/spdlog_impl.h | 154 - inc/spdlog/formatter.h | 58 - inc/spdlog/logger.h | 132 - inc/spdlog/sinks/base_sink.h | 66 - inc/spdlog/sinks/file_sinks.h | 232 -- inc/spdlog/sinks/null_sink.h | 52 - inc/spdlog/sinks/ostream_sink.h | 67 - inc/spdlog/sinks/sink.h | 42 - inc/spdlog/sinks/stdout_sinks.h | 71 - inc/spdlog/sinks/syslog_sink.h | 102 - inc/spdlog/spdlog.h | 155 - inc/spdlog/tweakme.h | 74 - include/spdlog/async_logger.h | 90 + include/spdlog/common.h | 116 + include/spdlog/details/async_log_helper.h | 326 ++ include/spdlog/details/async_logger_impl.h | 82 + include/spdlog/details/file_helper.h | 144 + include/spdlog/details/format.cc | 1353 ++++++++ include/spdlog/details/format.h | 3155 ++++++++++++++++++ include/spdlog/details/line_logger.h | 221 ++ include/spdlog/details/log_msg.h | 98 + include/spdlog/details/logger_impl.h | 320 ++ include/spdlog/details/mpmc_bounded_q.h | 175 + include/spdlog/details/null_mutex.h | 43 + include/spdlog/details/os.h | 198 ++ include/spdlog/details/pattern_formatter_impl.h | 628 ++++ include/spdlog/details/registry.h | 180 + include/spdlog/details/spdlog_impl.h | 154 + include/spdlog/formatter.h | 58 + include/spdlog/logger.h | 132 + include/spdlog/sinks/base_sink.h | 66 + include/spdlog/sinks/file_sinks.h | 232 ++ include/spdlog/sinks/null_sink.h | 52 + include/spdlog/sinks/ostream_sink.h | 67 + include/spdlog/sinks/sink.h | 42 + include/spdlog/sinks/stdout_sinks.h | 71 + include/spdlog/sinks/syslog_sink.h | 102 + include/spdlog/spdlog.h | 155 + include/spdlog/tweakme.h | 74 + libminifi/CMakeLists.txt | 51 + libminifi/include/Configure.h | 115 + libminifi/include/Connection.h | 201 ++ libminifi/include/Exception.h | 95 + libminifi/include/FlowControlProtocol.h | 339 ++ libminifi/include/FlowController.h | 248 ++ libminifi/include/FlowFileRecord.h | 220 ++ libminifi/include/GenerateFlowFile.h | 87 + libminifi/include/GetFile.h | 117 + libminifi/include/ListenSyslog.h | 209 ++ libminifi/include/LogAttribute.h | 128 + libminifi/include/Logger.h | 154 + libminifi/include/ProcessContext.h | 99 + libminifi/include/ProcessGroup.h | 182 + libminifi/include/ProcessSession.h | 116 + libminifi/include/Processor.h | 346 ++ libminifi/include/Property.h | 344 ++ libminifi/include/RealTimeDataCollector.h | 131 + libminifi/include/Relationship.h | 87 + libminifi/include/RemoteProcessorGroupPort.h | 96 + libminifi/include/ResourceClaim.h | 92 + libminifi/include/SchedulingAgent.h | 98 + libminifi/include/Site2SiteClientProtocol.h | 638 ++++ libminifi/include/Site2SitePeer.h | 364 ++ libminifi/include/TailFile.h | 93 + libminifi/include/TimeUtil.h | 82 + libminifi/include/TimerDrivenSchedulingAgent.h | 66 + libminifi/src/Configure.cpp | 167 + libminifi/src/Connection.cpp | 160 + libminifi/src/FlowControlProtocol.cpp | 541 +++ libminifi/src/FlowController.cpp | 1190 +++++++ libminifi/src/FlowFileRecord.cpp | 231 ++ libminifi/src/GenerateFlowFile.cpp | 134 + libminifi/src/GetFile.cpp | 295 ++ libminifi/src/ListenSyslog.cpp | 342 ++ libminifi/src/LogAttribute.cpp | 158 + libminifi/src/Logger.cpp | 27 + libminifi/src/ProcessGroup.cpp | 314 ++ libminifi/src/ProcessSession.cpp | 731 ++++ libminifi/src/Processor.cpp | 451 +++ libminifi/src/RealTimeDataCollector.cpp | 482 +++ libminifi/src/RemoteProcessorGroupPort.cpp | 100 + libminifi/src/ResourceClaim.cpp | 45 + libminifi/src/SchedulingAgent.cpp | 86 + libminifi/src/Site2SiteClientProtocol.cpp | 1313 ++++++++ libminifi/src/Site2SitePeer.cpp | 435 +++ libminifi/src/TailFile.cpp | 272 ++ libminifi/src/TimerDrivenSchedulingAgent.cpp | 134 + libminifi/test/FlowFileRecordTest.cpp | 28 + libminifi/test/Server.cpp | 607 ++++ main/CMakeLists.txt | 44 + src/Configure.cpp | 167 - src/Connection.cpp | 160 - src/FlowControlProtocol.cpp | 541 --- src/FlowController.cpp | 1190 ------- src/FlowFileRecord.cpp | 231 -- src/GenerateFlowFile.cpp | 134 - src/GetFile.cpp | 295 -- src/ListenSyslog.cpp | 342 -- src/LogAttribute.cpp | 158 - src/Logger.cpp | 27 - src/ProcessGroup.cpp | 314 -- src/ProcessSession.cpp | 731 ---- src/Processor.cpp | 451 --- src/RealTimeDataCollector.cpp | 482 --- src/RemoteProcessorGroupPort.cpp | 100 - src/ResourceClaim.cpp | 45 - src/SchedulingAgent.cpp | 86 - src/Site2SiteClientProtocol.cpp | 1313 -------- src/Site2SitePeer.cpp | 435 --- src/TailFile.cpp | 272 -- src/TimerDrivenSchedulingAgent.cpp | 134 - test/FlowFileRecordTest.cpp | 28 - test/Server.cpp | 607 ---- thirdparty/yaml-cpp-yaml-cpp-0.5.3/Makefile | 40 - 160 files changed, 21572 insertions(+), 21493 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b02af540/.gitignore ---------------------------------------------------------------------- diff --git a/.gitignore b/.gitignore index 9d78a7f..69c2234 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,10 @@ # Filter out generated files from the included libuuid thirdparty/uuid/tst_uuid* assemblies +CMakeCache.txt +CMakeFiles +CMakeScripts +Makefile +cmake_install.cmake +install_manifest.txt +CTestTestfile.cmake http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b02af540/.travis.yml ---------------------------------------------------------------------- diff --git a/.travis.yml b/.travis.yml index b9c9d20..eb2baa5 100644 --- a/.travis.yml +++ b/.travis.yml @@ -20,7 +20,7 @@ script: # Establish updated toolchain as default - sudo unlink /usr/bin/gcc && sudo ln -s /usr/bin/gcc-4.8 /usr/bin/gcc - sudo unlink /usr/bin/g++ && sudo ln -s /usr/bin/g++-4.8 /usr/bin/g++ - - make + - mkdir ./build && cd ./build && cmake .. && make addons: apt: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b02af540/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/CMakeLists.txt b/CMakeLists.txt new file mode 100644 index 0000000..1cc95c2 --- /dev/null +++ b/CMakeLists.txt @@ -0,0 +1,74 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +cmake_minimum_required(VERSION 2.6) + +set(PROJECT "nifi-minifi-cpp") +set(VERSION "0.1.0") + +#### Establish Project Configuration #### +# Enable usage of the VERSION specifier +# https://cmake.org/cmake/help/v3.0/policy/CMP0048.html#policy:CMP0048 +cmake_policy(SET CMP0048 NEW) + +project(${PROJECT} + VERSION ${VERSION}) + +set(CMAKE_CXX_STANDARD 11) +set(CMAKE_CXX_STANDARD_REQUIRED ON) + +# Provide custom modules for the project +list(APPEND CMAKE_MODULE_PATH "${CMAKE_CURRENT_SOURCE_DIR}/cmake") + +#file(GLOB SOURCES "libminifi/src/*.cpp") +add_subdirectory(thirdparty/yaml-cpp-yaml-cpp-0.5.3) +add_subdirectory(libminifi) +add_subdirectory(main) + +# Generate source assembly +set(ASSEMBLY_BASE_NAME "${CMAKE_PROJECT_NAME}-${PROJECT_VERSION_MAJOR}.${PROJECT_VERSION_MINOR}.${PROJECT_VERSION_PATCH}") +set(CPACK_SOURCE_GENERATOR "TGZ") +set(CPACK_SOURCE_PACKAGE_FILE_NAME "${ASSEMBLY_BASE_NAME}-source") +set(CPACK_SOURCE_IGNORE_FILES "/build/;/.bzr/;~$;${CPACK_SOURCE_IGNORE_FILES}") + +# Generate binary assembly +install(FILES conf/minifi.properties + DESTINATION conf + COMPONENT bin) + +install(PROGRAMS bin/minifi.sh + DESTINATION bin + COMPONENT bin) + +install(FILES LICENSE README.md NOTICE + DESTINATION . + COMPONENT bin) + +set(CPACK_GENERATOR "TGZ") +set(CPACK_PACKAGE_DESCRIPTION_SUMMARY "Apache NiFi MiNiFi C++ version ${VERSION}") +set(CPACK_PACKAGE_VENDOR "Apache NiFi") +set(CPACK_PACKAGE_DESCRIPTION_FILE "${CMAKE_CURRENT_SOURCE_DIR}/README.md") +set(CPACK_RESOURCE_FILE_LICENSE "${CMAKE_CURRENT_SOURCE_DIR}/LICENSE") +set(CPACK_PACKAGE_FILE_NAME "${ASSEMBLY_BASE_NAME}") +set(CPACK_BINARY_TGZ, "ON") + +set(CPACK_ARCHIVE_COMPONENT_INSTALL ON) +set(CPACK_COMPONENTS_ALL bin) + +include(CPack) http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b02af540/Makefile ---------------------------------------------------------------------- diff --git a/Makefile b/Makefile deleted file mode 100644 index 0b03842..0000000 --- a/Makefile +++ /dev/null @@ -1,109 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License - - -# for ARM make CROSS_COMPILE=arm-linux-gnueabi ARCH=arm -VERSION=0.1.0 -CC=$(CROSS_COMPILE)-g++ -AR=$(CROSS_COMPILE)-ar -BUILD_DIR= ./build -TARGET_DIR=./target -ASSEMBLIES_DIR = ./assemblies -TARGET_LIB=libminifi.a -PROJECT=minifi -TARGET_EXE=$(PROJECT) -CFLAGS=-Os -fexceptions -fpermissive -Wno-write-strings -std=c++11 -fPIC -Wall -g -Wno-unused-private-field -INCLUDES=-I./inc -I./src -I./test -I./thirdparty -I/usr/include/libxml2 -I./thirdparty/yaml-cpp-yaml-cpp-0.5.3/include -LDDIRECTORY=-L./build -L./thirdparty/uuid -L./thirdparty/yaml-cpp-yaml-cpp-0.5.3/lib/ -LDFLAGS=-lminifi -lxml2 -pthread -luuid -lyaml-cpp - -UNAME_S := $(shell uname -s) -ifeq ($(UNAME_S),Linux) - LDFLAGS += -lrt -endif -ifeq ($(UNAME_S),Darwin) -endif - -OBJS:=$(shell /bin/ls src/*.cpp | xargs -n1 basename 2>/dev/null | awk '/\.cpp$$/{a=$$0; gsub("\\.cpp$$",".o", a); print "$(BUILD_DIR)/" a}') -TESTS:=Server - -all: thirdparty/yaml-cpp-yaml-cpp-0.5.3/lib/libyaml-cpp.a directory $(BUILD_DIR)/$(TARGET_LIB) minifi tests assembly-pkgs - -thirdparty/yaml-cpp-yaml-cpp-0.5.3/lib/libyaml-cpp.a: - make -C thirdparty/yaml-cpp-yaml-cpp-0.5.3 - - -.PHONY: directory -directory: - mkdir -p $(BUILD_DIR) - mkdir -p $(TARGET_DIR) - make -C thirdparty/uuid - -$(BUILD_DIR)/%.o: src/%.cpp - $(CC) $(CFLAGS) $(INCLUDES) -o $@ -c $< - -$(BUILD_DIR)/$(TARGET_LIB): $(OBJS) - $(AR) crs $@ $(OBJS) - -minifi: $(BUILD_DIR)/$(TARGET_LIB) thirdparty/yaml-cpp-yaml-cpp-0.5.3/lib/libyaml-cpp.a - $(CC) $(CFLAGS) $(INCLUDES) -o $(BUILD_DIR)/$(TARGET_EXE) main/MiNiFiMain.cpp $(LDDIRECTORY) $(LDFLAGS) - cp $(BUILD_DIR)/$(TARGET_EXE) $(TARGET_DIR)/$(TARGET_EXE) - cp $(BUILD_DIR)/$(TARGET_EXE) bin/$(TARGET_EXE) - -.PHONY: tests -tests: $(BUILD_DIR)/$(TARGET_LIB) thirdparty/yaml-cpp-yaml-cpp-0.5.3/lib/libyaml-cpp.a - $(foreach TEST_NAME, $(TESTS),\ - $(CC) $(CFLAGS) $(INCLUDES) -o $(BUILD_DIR)/$(TEST_NAME) test/$(TEST_NAME).cpp $(LDDIRECTORY) $(LDFLAGS)) - -$(ASSEMBLIES_DIR) : - mkdir -p $(ASSEMBLIES_DIR) - -.PHONY: assembly-pkgs -assembly-pkgs: $(ASSEMBLIES_DIR)/$(PROJECT)-$(VERSION)-bin.tar.gz $(ASSEMBLIES_DIR)/$(PROJECT)-$(VERSION)-source.tar.gz - -$(ASSEMBLIES_DIR)/$(PROJECT)-$(VERSION)-source.tar.gz : $(ASSEMBLIES_DIR) - mkdir -p $(ASSEMBLIES_DIR)/$(PROJECT)-$(VERSION)-source - cp -r LICENSE \ - NOTICE \ - README.md \ - inc \ - src \ - main \ - bin \ - conf \ - thirdparty \ - Makefile \ - $(ASSEMBLIES_DIR)/$(PROJECT)-$(VERSION)-source - tar -czf $(ASSEMBLIES_DIR)/$(PROJECT)-$(VERSION)-source.tar.gz -C $(ASSEMBLIES_DIR) $(PROJECT)-$(VERSION)-source - -$(ASSEMBLIES_DIR)/$(PROJECT)-$(VERSION)-bin.tar.gz : $(ASSEMBLIES_DIR) $(TARGET_EXE) - mkdir -p $(ASSEMBLIES_DIR)/$(PROJECT)-$(VERSION)-bin - cp -R LICENSE \ - NOTICE \ - README.md \ - conf \ - bin \ - $(ASSEMBLIES_DIR)/$(PROJECT)-$(VERSION)-bin - cp target/minifi $(ASSEMBLIES_DIR)/$(PROJECT)-$(VERSION)-bin/bin/ - tar -czf $(ASSEMBLIES_DIR)/$(PROJECT)-$(VERSION)-bin.tar.gz -C $(ASSEMBLIES_DIR) $(PROJECT)-$(VERSION)-bin - -.PHONY: clean -clean: - rm -rf $(BUILD_DIR) - rm -rf $(TARGET_DIR) - rm -rf $(ASSEMBLIES_DIR) - make -C thirdparty/yaml-cpp-yaml-cpp-0.5.3 clean - make -C thirdparty/uuid clean http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b02af540/README.md ---------------------------------------------------------------------- diff --git a/README.md b/README.md index 0b4ac50..c8abc33 100644 --- a/README.md +++ b/README.md @@ -58,7 +58,8 @@ Perspectives of the role of MiNiFi should be from the perspective of the agent a ### To build #### Utilities -* Make +* CMake + * 2.8 or greater * gcc * 4.8.4 or greater * g++ @@ -77,33 +78,84 @@ Perspectives of the role of MiNiFi should be from the perspective of the agent a ## Getting Started ### Building -From your source checkout, perform `make` in the root of the directory where the Makefile is located. For parallel building, the '-j' or '--jobs' option maybe used. On an average development machine, a serial build takes approximately 90 seconds. +- From your source checkout, create a directory to perform the build (e.g. build) and cd into that directory. + + + # ~/Development/code/apache/nifi-minifi-cpp on git:master + $ mkdir build + # ~/Development/code/apache/nifi-minifi-cpp on git:master + $ cd build + + +- Perform a `cmake ..` to generate the project files + + + # ~/Development/code/apache/nifi-minifi-cpp on git:master + $ cmake .. + ... + -- Configuring done + -- Generating done + -- Build files have been written to: /Users/apiri/Development/code/apache/nifi-minifi-cpp/build + + +- Perform a build + # ~/Development/code/apache/nifi-minifi-cpp on git:master $ make - make -C thirdparty/yaml-cpp-yaml-cpp-0.5.3 - mkdir -p ./build - g++ -Os -I./include -c -o build/parse.o src/parse.cpp - mkdir -p ./build - g++ -Os -I./include -c -o build/parser.o src/parser.cpp - mkdir -p ./build - g++ -Os -I./include -c -o build/regex_yaml.o src/regex_yaml.cpp + Scanning dependencies of target gmock_main + Scanning dependencies of target gmock + Scanning dependencies of target minifi + Scanning dependencies of target gtest + Scanning dependencies of target yaml-cpp + [ 1%] Building CXX object thirdparty/yaml-cpp-yaml-cpp-0.5.3/test/gmock-1.7.0/gtest/CMakeFiles/gtest.dir/src/gtest-all.cc.o + [ 3%] Building CXX object thirdparty/yaml-cpp-yaml-cpp-0.5.3/test/gmock-1.7.0/CMakeFiles/gmock.dir/gtest/src/gtest-all.cc.o + [ 3%] Building CXX object thirdparty/yaml-cpp-yaml-cpp-0.5.3/test/gmock-1.7.0/CMakeFiles/gmock.dir/src/gmock-all.cc.o + [ 6%] Building CXX object thirdparty/yaml-cpp-yaml-cpp-0.5.3/test/gmock-1.7.0/CMakeFiles/gmock_main.dir/gtest/src/gtest-all.cc.o + [ 6%] Building CXX object thirdparty/yaml-cpp-yaml-cpp-0.5.3/test/gmock-1.7.0/CMakeFiles/gmock_main.dir/src/gmock-all.cc.o + [ 7%] Building CXX object libminifi/CMakeFiles/minifi.dir/src/Configure.cpp.o + ... + [ 97%] Linking CXX executable minifi + [ 97%] Built target minifiexe + [ 98%] Building CXX object thirdparty/yaml-cpp-yaml-cpp-0.5.3/test/CMakeFiles/run-tests.dir/node/node_test.cpp.o + [100%] Linking CXX executable run-tests + [100%] Built target run-tests + + + +- Create a binary assembly located in your build directory with suffix -bin.tar.gz + + + ~/Development/code/apache/nifi-minifi-cpp/build + $ make package + Run CPack packaging tool for source... + CPack: Create package using TGZ + CPack: Install projects + CPack: - Install directory: ~/Development/code/apache/nifi-minifi-cpp + CPack: Create package + CPack: - package: ~/Development/code/apache/nifi-minifi-cpp/build/nifi-minifi-cpp-0.1.0-bin.tar.gz generated. + + +- Create a source assembly located in your build directory with suffix -source.tar.gz + + + ~/Development/code/apache/nifi-minifi-cpp/build + $ make package_source + Run CPack packaging tool for source... + CPack: Create package using TGZ + CPack: Install projects + CPack: - Install directory: ~/Development/code/apache/nifi-minifi-cpp + CPack: Create package + CPack: - package: ~/Development/code/apache/nifi-minifi-cpp/build/nifi-minifi-cpp-0.1.0-source.tar.gz generated. + ### Cleaning -Generated files and artifacts can be removed by performing a `make clean`. +Remove the build directory created above. # ~/Development/code/apache/nifi-minifi-cpp on git:master - $ make clean - rm -rf ./build - rm -rf ./target - rm -rf ./assemblies - make -C thirdparty/yaml-cpp-yaml-cpp-0.5.3 clean - rm -rf ./lib ./build - make -C thirdparty/uuid clean - rm -f *.o libuuid.a - find ./ -iname "*.o" -exec rm -f {} \; + $ rm -rf ./build ### Configuring The 'conf' directory in the root contains a template flow.yml document. http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b02af540/inc/Configure.h ---------------------------------------------------------------------- diff --git a/inc/Configure.h b/inc/Configure.h deleted file mode 100644 index d325fa0..0000000 --- a/inc/Configure.h +++ /dev/null @@ -1,115 +0,0 @@ -/** - * @file Configure.h - * Configure class declaration - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#ifndef __CONFIGURE_H__ -#define __CONFIGURE_H__ - -#include <stdio.h> -#include <string> -#include <map> -#include <stdlib.h> -#include <errno.h> -#include <iostream> -#include <fstream> -#include "Logger.h" - -class Configure { -public: - //! Get the singleton logger instance - static Configure * getConfigure() - { - if (!_configure) - { - _configure = new Configure(); - } - return _configure; - } - //! nifi.flow.configuration.file - static const char *nifi_flow_configuration_file; - static const char *nifi_administrative_yield_duration; - static const char *nifi_bored_yield_duration; - static const char *nifi_server_name; - static const char *nifi_server_port; - static const char *nifi_server_report_interval; - - //! Clear the load config - void clear() - { - std::lock_guard<std::mutex> lock(_mtx); - _properties.clear(); - } - //! Set the config value - void set(std::string key, std::string value) - { - std::lock_guard<std::mutex> lock(_mtx); - _properties[key] = value; - } - //! Check whether the config value existed - bool has(std::string key) - { - std::lock_guard<std::mutex> lock(_mtx); - return (_properties.find(key) != _properties.end()); - } - //! Get the config value - bool get(std::string key, std::string &value); - // Trim String utils - std::string trim(const std::string& s); - std::string trimLeft(const std::string& s); - std::string trimRight(const std::string& s); - //! Parse one line in configure file like key=value - void parseConfigureFileLine(char *buf); - //! Load Configure File - void loadConfigureFile(const char *fileName); - //! Set the determined MINIFI_HOME - void setHome(std::string minifiHome) - { - _minifiHome = minifiHome; - } - - //! Get the determined MINIFI_HOME - std::string getHome() - { - return _minifiHome; - } - //! Parse Command Line - void parseCommandLine(int argc, char **argv); - -private: - //! Mutex for protection - std::mutex _mtx; - //! Logger - Logger *_logger; - //! Home location for this executable - std::string _minifiHome; - - Configure() - { - _logger = Logger::getLogger(); - } - virtual ~Configure() - { - - } - static Configure *_configure; - -protected: - std::map<std::string,std::string> _properties; -}; - -#endif http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b02af540/inc/Connection.h ---------------------------------------------------------------------- diff --git a/inc/Connection.h b/inc/Connection.h deleted file mode 100644 index dc6b94b..0000000 --- a/inc/Connection.h +++ /dev/null @@ -1,201 +0,0 @@ -/** - * @file Connection.h - * Connection class declaration - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#ifndef __CONNECTION_H__ -#define __CONNECTION_H__ - -#include <uuid/uuid.h> -#include <vector> -#include <queue> -#include <map> -#include <mutex> -#include <atomic> -#include <algorithm> - -#include "FlowFileRecord.h" -#include "Relationship.h" -#include "Logger.h" - -//! Forwarder declaration -class Processor; - -//! Connection Class -class Connection -{ -public: - //! Constructor - /*! - * Create a new processor - */ - Connection(std::string name, uuid_t uuid = NULL, uuid_t srcUUID = NULL, uuid_t destUUID = NULL); - //! Destructor - virtual ~Connection() {} - //! Set Connection Name - void setName(std::string name) { - _name = name; - } - //! Get Process Name - std::string getName(void) { - return (_name); - } - //! Set UUID - void setUUID(uuid_t uuid) { - uuid_copy(_uuid, uuid); - } - //! Set Source Processor UUID - void setSourceProcessorUUID(uuid_t uuid) { - uuid_copy(_srcUUID, uuid); - } - //! Set Destination Processor UUID - void setDestinationProcessorUUID(uuid_t uuid) { - uuid_copy(_destUUID, uuid); - } - //! Get Source Processor UUID - void getSourceProcessorUUID(uuid_t uuid) { - uuid_copy(uuid, _srcUUID); - } - //! Get Destination Processor UUID - void getDestinationProcessorUUID(uuid_t uuid) { - uuid_copy(uuid, _destUUID); - } - //! Get UUID - bool getUUID(uuid_t uuid) { - if (uuid) - { - uuid_copy(uuid, _uuid); - return true; - } - else - return false; - } - //! Set Connection Source Processor - void setSourceProcessor(Processor *source) { - _srcProcessor = source; - } - // ! Get Connection Source Processor - Processor *getSourceProcessor() { - return _srcProcessor; - } - //! Set Connection Destination Processor - void setDestinationProcessor(Processor *dest) { - _destProcessor = dest; - } - // ! Get Connection Destination Processor - Processor *getDestinationProcessor() { - return _destProcessor; - } - //! Set Connection relationship - void setRelationship(Relationship relationship) { - _relationship = relationship; - } - // ! Get Connection relationship - Relationship getRelationship() { - return _relationship; - } - //! Set Max Queue Size - void setMaxQueueSize(uint64_t size) - { - _maxQueueSize = size; - } - //! Get Max Queue Size - uint64_t getMaxQueueSize() - { - return _maxQueueSize; - } - //! Set Max Queue Data Size - void setMaxQueueDataSize(uint64_t size) - { - _maxQueueDataSize = size; - } - //! Get Max Queue Data Size - uint64_t getMaxQueueDataSize() - { - return _maxQueueDataSize; - } - //! Set Flow expiration duration in millisecond - void setFlowExpirationDuration(uint64_t duration) - { - _expiredDuration = duration; - } - //! Get Flow expiration duration in millisecond - uint64_t getFlowExpirationDuration() - { - return _expiredDuration; - } - //! Check whether the queue is empty - bool isEmpty(); - //! Check whether the queue is full to apply back pressure - bool isFull(); - //! Get queue size - uint64_t getQueueSize() { - std::lock_guard<std::mutex> lock(_mtx); - return _queue.size(); - } - //! Get queue data size - uint64_t getQueueDataSize() - { - return _maxQueueDataSize; - } - //! Put the flow file into queue - void put(FlowFileRecord *flow); - //! Poll the flow file from queue, the expired flow file record also being returned - FlowFileRecord *poll(std::set<FlowFileRecord *> &expiredFlowRecords); - //! Drain the flow records - void drain(); - -protected: - //! A global unique identifier - uuid_t _uuid; - //! Source Processor UUID - uuid_t _srcUUID; - //! Destination Processor UUID - uuid_t _destUUID; - //! Connection Name - std::string _name; - //! Relationship for this connection - Relationship _relationship; - //! Source Processor (ProcessNode/Port) - Processor *_srcProcessor; - //! Destination Processor (ProcessNode/Port) - Processor *_destProcessor; - //! Max queue size to apply back pressure - std::atomic<uint64_t> _maxQueueSize; - //! Max queue data size to apply back pressure - std::atomic<uint64_t> _maxQueueDataSize; - //! Flow File Expiration Duration in= MilliSeconds - std::atomic<uint64_t> _expiredDuration; - - -private: - //! Mutex for protection - std::mutex _mtx; - //! Queued data size - std::atomic<uint64_t> _queuedDataSize; - //! Queue for the Flow File - std::queue<FlowFileRecord *> _queue; - //! Logger - Logger *_logger; - // Prevent default copy constructor and assignment operation - // Only support pass by reference or pointer - Connection(const Connection &parent); - Connection &operator=(const Connection &parent); - -}; - -#endif http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b02af540/inc/Exception.h ---------------------------------------------------------------------- diff --git a/inc/Exception.h b/inc/Exception.h deleted file mode 100644 index d321454..0000000 --- a/inc/Exception.h +++ /dev/null @@ -1,95 +0,0 @@ -/** - * @file Exception.h - * Exception class declaration - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#ifndef __EXCEPTION_H__ -#define __EXCEPTION_H__ - -#include <sstream> -#include <exception> -#include <stdexcept> -#include <errno.h> -#include <string.h> - -//! ExceptionType -enum ExceptionType -{ - FILE_OPERATION_EXCEPTION = 0, - FLOW_EXCEPTION, - PROCESSOR_EXCEPTION, - PROCESS_SESSION_EXCEPTION, - PROCESS_SCHEDULE_EXCEPTION, - SITE2SITE_EXCEPTION, - GENERAL_EXCEPTION, - MAX_EXCEPTION -}; - -//! Exception String -static const char *ExceptionStr[MAX_EXCEPTION] = -{ - "File Operation", - "Flow File Operation", - "Processor Operation", - "Process Session Operation", - "Process Schedule Operation", - "Site2Site Protocol", - "General Operation" -}; - -//! Exception Type to String -inline const char *ExceptionTypeToString(ExceptionType type) -{ - if (type < MAX_EXCEPTION) - return ExceptionStr[type]; - else - return NULL; -} - -//! Exception Class -class Exception : public std::exception -{ -public: - //! Constructor - /*! - * Create a new flow record - */ - Exception(ExceptionType type, const char *errorMsg) : _type(type), _errorMsg(errorMsg) { - } - //! Destructor - virtual ~Exception() throw () {} - virtual const char * what() const throw () { - - _whatStr = ExceptionTypeToString(_type); - - _whatStr += ":" + _errorMsg; - return _whatStr.c_str(); - } - -protected: - -private: - //! Exception type - ExceptionType _type; - //! Exception detailed information - std::string _errorMsg; - //! Hold the what result - mutable std::string _whatStr; - -}; - -#endif http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b02af540/inc/FlowControlProtocol.h ---------------------------------------------------------------------- diff --git a/inc/FlowControlProtocol.h b/inc/FlowControlProtocol.h deleted file mode 100644 index 23f2d49..0000000 --- a/inc/FlowControlProtocol.h +++ /dev/null @@ -1,339 +0,0 @@ -/** - * @file FlowControlProtocol.h - * FlowControlProtocol class declaration - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#ifndef __FLOW_CONTROL_PROTOCOL_H__ -#define __FLOW_CONTROL_PROTOCOL_H__ - -#include <stdio.h> -#include <unistd.h> -#include <sys/types.h> -#include <sys/socket.h> -#include <netinet/in.h> -#include <arpa/inet.h> -#include <fcntl.h> -#include <netdb.h> -#include <string> -#include <errno.h> -#include <chrono> -#include <thread> -#include "Logger.h" -#include "Configure.h" -#include "Property.h" - -//! Forwarder declaration -class FlowController; - -#define DEFAULT_NIFI_SERVER_PORT 9000 -#define DEFAULT_REPORT_INTERVAL 1000 // 1 sec -#define MAX_READ_TIMEOUT 30000 // 30 seconds - -//! FlowControl Protocol Msg Type -typedef enum { - REGISTER_REQ, // Device Register Request from device to server which contain device serial number, current running flow xml version - REGISTER_RESP, // Device Register Respond from server to device, may contain new flow.xml from server ask device to apply and also device report interval - REPORT_REQ, // Period Device Report from device to server which contain device serial number, current running flow xml name/version and other period report info - REPORT_RESP, // Report Respond from server to device, may ask device to update flow xml or processor property - MAX_FLOW_CONTROL_MSG_TYPE -} FlowControlMsgType; - -//! FlowControl Protocol Msg Type String -static const char *FlowControlMsgTypeStr[MAX_FLOW_CONTROL_MSG_TYPE] = -{ - "REGISTER_REQ", - "REGISTER_RESP", - "REPORT_REQ", - "REPORT_RESP" -}; - -//! Flow Control Msg Type to String -inline const char *FlowControlMsgTypeToStr(FlowControlMsgType type) -{ - if (type < MAX_FLOW_CONTROL_MSG_TYPE) - return FlowControlMsgTypeStr[type]; - else - return NULL; -} - -//! FlowControll Protocol Msg ID (Some Messages are fix length, Some are variable length (TLV) -typedef enum { - //Fix length 8 bytes: client to server in register request, required field - FLOW_SERIAL_NUMBER, - // Flow XML name TLV: client to server in register request and report request, required field - FLOW_XML_NAME, - // Flow XML content, TLV: server to client in register respond, option field in case server want to ask client to load xml from server - FLOW_XML_CONTENT, - // Fix length, 4 bytes Report interval in msec: server to client in register respond, option field - REPORT_INTERVAL, - // Processor Name TLV: server to client in report respond, option field in case server want to ask client to update processor property - PROCESSOR_NAME, - // Processor Property Name TLV: server to client in report respond, option field in case server want to ask client to update processor property - PROPERTY_NAME, - // Processor Property Value TLV: server to client in report respond, option field in case server want to ask client to update processor property - PROPERTY_VALUE, - // Report Blob TLV: client to server in report request, option field in case client want to pickyback the report blob in report request to server - REPORT_BLOB, - MAX_FLOW_MSG_ID -} FlowControlMsgID; - -//! FlowControl Protocol Msg ID String -static const char *FlowControlMsgIDStr[MAX_FLOW_MSG_ID] = -{ - "FLOW_SERIAL_NUMBER", - "FLOW_XML_NAME", - "FLOW_XML_CONTENT", - "REPORT_INTERVAL", - "PROCESSOR_NAME" - "PROPERTY_NAME", - "PROPERTY_VALUE", - "REPORT_BLOB" -}; - -#define TYPE_HDR_LEN 4 // Fix Hdr Type -#define TLV_HDR_LEN 8 // Type 4 bytes and Len 4 bytes - -//! FlowControl Protocol Msg Len -inline int FlowControlMsgIDEncodingLen(FlowControlMsgID id, int payLoadLen) -{ - if (id == FLOW_SERIAL_NUMBER) - return (TYPE_HDR_LEN + 8); - else if (id == REPORT_INTERVAL) - return (TYPE_HDR_LEN + 4); - else if (id < MAX_FLOW_MSG_ID) - return (TLV_HDR_LEN + payLoadLen); - else - return -1; -} - -//! Flow Control Msg Id to String -inline const char *FlowControlMsgIdToStr(FlowControlMsgID id) -{ - if (id < MAX_FLOW_MSG_ID) - return FlowControlMsgIDStr[id]; - else - return NULL; -} - -//! Flow Control Respond status code -typedef enum { - RESP_SUCCESS, - RESP_TRIGGER_REGISTER, // Server respond to client report to re trigger register - RESP_START_FLOW_CONTROLLER, // Server respond to client to start flow controller - RESP_STOP_FLOW_CONTROLLER, // Server respond to client to stop flow controller - RESP_FAILURE, - MAX_RESP_CODE -} FlowControlRespCode; - -//! FlowControl Resp Code str -static const char *FlowControlRespCodeStr[MAX_RESP_CODE] = -{ - "RESP_SUCCESS", - "RESP_TRIGGER_REGISTER", - "RESP_START_FLOW_CONTROLLER", - "RESP_STOP_FLOW_CONTROLLER", - "RESP_FAILURE" -}; - -//! Flow Control Resp Code to String -inline const char *FlowControlRespCodeToStr(FlowControlRespCode code) -{ - if (code < MAX_RESP_CODE) - return FlowControlRespCodeStr[code]; - else - return NULL; -} - -//! Common FlowControlProtocol Header -typedef struct { - uint32_t msgType; //! Msg Type - uint32_t seqNumber; //! Seq Number to match Req with Resp - uint32_t status; //! Resp Code, see FlowControlRespCode - uint32_t payloadLen; //! Msg Payload length -} FlowControlProtocolHeader; - -//! FlowControlProtocol Class -class FlowControlProtocol -{ -public: - //! Constructor - /*! - * Create a new control protocol - */ - FlowControlProtocol(FlowController *controller) { - _controller = controller; - _logger = Logger::getLogger(); - _configure = Configure::getConfigure(); - _socket = 0; - _serverName = "localhost"; - _serverPort = DEFAULT_NIFI_SERVER_PORT; - _registered = false; - _seqNumber = 0; - _reportBlob = NULL; - _reportBlobLen = 0; - _reportInterval = DEFAULT_REPORT_INTERVAL; - _running = false; - - std::string value; - - if (_configure->get(Configure::nifi_server_name, value)) - { - _serverName = value; - _logger->log_info("NiFi Server Name %s", _serverName.c_str()); - } - if (_configure->get(Configure::nifi_server_port, value) && Property::StringToInt(value, _serverPort)) - { - _logger->log_info("NiFi Server Port: [%d]", _serverPort); - } - if (_configure->get(Configure::nifi_server_report_interval, value)) - { - TimeUnit unit; - if (Property::StringToTime(value, _reportInterval, unit) && - Property::ConvertTimeUnitToMS(_reportInterval, unit, _reportInterval)) - { - _logger->log_info("NiFi server report interval: [%d] ms", _reportInterval); - } - } - } - //! Destructor - virtual ~FlowControlProtocol() - { - stop(); - if (_socket) - close(_socket); - if (_reportBlob) - delete [] _reportBlob; - if (this->_thread) - delete this->_thread; - } - -public: - - //! SendRegisterRequest and Process Register Respond, return 0 for success - int sendRegisterReq(); - //! SendReportReq and Process Report Respond, return 0 for success - int sendReportReq(); - //! Start the flow control protocol - void start(); - //! Stop the flow control protocol - void stop(); - //! Set Report BLOB for periodically report - void setReportBlob(char *blob, int len) - { - std::lock_guard<std::mutex> lock(_mtx); - if (_reportBlob && _reportBlobLen >= len) - { - memcpy(_reportBlob, blob, len); - _reportBlobLen = len; - } - else - { - if (_reportBlob) - delete[] _reportBlob; - _reportBlob = new char[len]; - _reportBlobLen = len; - } - } - //! Run function for the thread - static void run(FlowControlProtocol *protocol); - //! set 8 bytes SerialNumber - void setSerialNumber(uint8_t *number) - { - memcpy(_serialNumber, number, 8); - } - -protected: - -private: - //! Connect to the socket, return sock descriptor if success, 0 for failure - int connectServer(const char *host, uint16_t port); - //! Send Data via the socket, return -1 for failure - int sendData(uint8_t *buf, int buflen); - //! Read length into buf, return -1 for failure and 0 for EOF - int readData(uint8_t *buf, int buflen); - //! Select on the socket - int selectClient(int msec); - //! Read the header - int readHdr(FlowControlProtocolHeader *hdr); - //! encode uint32_t - uint8_t *encode(uint8_t *buf, uint32_t value) - { - *buf++ = (value & 0xFF000000) >> 24; - *buf++ = (value & 0x00FF0000) >> 16; - *buf++ = (value & 0x0000FF00) >> 8; - *buf++ = (value & 0x000000FF); - return buf; - } - //! encode uint32_t - uint8_t *decode(uint8_t *buf, uint32_t &value) - { - value = ((buf[0]<<24)|(buf[1]<<16)|(buf[2]<<8)|(buf[3])); - return (buf + 4); - } - //! encode byte array - uint8_t *encode(uint8_t *buf, uint8_t *bufArray, int size) - { - memcpy(buf, bufArray, size); - buf += size; - return buf; - } - //! encode std::string - uint8_t *encode(uint8_t *buf, std::string value) - { - // add the \0 for size - buf = encode(buf, value.size()+1); - buf = encode(buf, (uint8_t *) value.c_str(), value.size()+1); - return buf; - } - //! Mutex for protection - std::mutex _mtx; - //! Logger - Logger *_logger; - //! Configure - Configure *_configure; - //! NiFi server Name - std::string _serverName; - //! NiFi server port - int64_t _serverPort; - //! Serial Number - uint8_t _serialNumber[8]; - //! socket to server - int _socket; - //! report interal in msec - int64_t _reportInterval; - //! whether it was registered to the NiFi server - bool _registered; - //! seq number - uint32_t _seqNumber; - //! FlowController - FlowController *_controller; - //! report Blob - char *_reportBlob; - //! report Blob len; - int _reportBlobLen; - //! thread - std::thread *_thread; - //! whether it is running - bool _running; - // Prevent default copy constructor and assignment operation - // Only support pass by reference or pointer - FlowControlProtocol(const FlowControlProtocol &parent); - FlowControlProtocol &operator=(const FlowControlProtocol &parent); - -}; - -#endif http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b02af540/inc/FlowController.h ---------------------------------------------------------------------- diff --git a/inc/FlowController.h b/inc/FlowController.h deleted file mode 100644 index 0d758df..0000000 --- a/inc/FlowController.h +++ /dev/null @@ -1,248 +0,0 @@ -/** - * @file FlowController.h - * FlowController class declaration - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#ifndef __FLOW_CONTROLLER_H__ -#define __FLOW_CONTROLLER_H__ - -#include <uuid/uuid.h> -#include <vector> -#include <queue> -#include <map> -#include <mutex> -#include <atomic> -#include <algorithm> -#include <set> -#include <libxml/parser.h> -#include <libxml/tree.h> -#include <yaml-cpp/yaml.h> - -#include "Configure.h" -#include "Property.h" -#include "Relationship.h" -#include "FlowFileRecord.h" -#include "Connection.h" -#include "Processor.h" -#include "ProcessContext.h" -#include "ProcessSession.h" -#include "ProcessGroup.h" -#include "GenerateFlowFile.h" -#include "LogAttribute.h" -#include "RealTimeDataCollector.h" -#include "TimerDrivenSchedulingAgent.h" -#include "FlowControlProtocol.h" -#include "RemoteProcessorGroupPort.h" -#include "GetFile.h" -#include "TailFile.h" -#include "ListenSyslog.h" -#include "ExecuteProcess.h" - -//! Default NiFi Root Group Name -#define DEFAULT_ROOT_GROUP_NAME "" -#define DEFAULT_FLOW_XML_FILE_NAME "conf/flow.xml" -#define DEFAULT_FLOW_YAML_FILE_NAME "conf/flow.yml" -#define CONFIG_YAML_PROCESSORS_KEY "Processors" - -enum class ConfigFormat { XML, YAML }; - -struct ProcessorConfig { - std::string name; - std::string javaClass; - std::string maxConcurrentTasks; - std::string schedulingStrategy; - std::string schedulingPeriod; - std::string penalizationPeriod; - std::string yieldPeriod; - std::string runDurationNanos; - std::vector<std::string> autoTerminatedRelationships; - std::vector<Property> properties; -}; - -//! FlowController Class -class FlowController -{ -public: - static const int DEFAULT_MAX_TIMER_DRIVEN_THREAD = 10; - static const int DEFAULT_MAX_EVENT_DRIVEN_THREAD = 5; - //! Constructor - /*! - * Create a new Flow Controller - */ - FlowController(std::string name = DEFAULT_ROOT_GROUP_NAME); - //! Destructor - virtual ~FlowController(); - //! Set FlowController Name - void setName(std::string name) { - _name = name; - } - //! Get Flow Controller Name - std::string getName(void) { - return (_name); - } - //! Set UUID - void setUUID(uuid_t uuid) { - uuid_copy(_uuid, uuid); - } - //! Get UUID - bool getUUID(uuid_t uuid) { - if (uuid) - { - uuid_copy(uuid, _uuid); - return true; - } - else - return false; - } - //! Set MAX TimerDrivenThreads - void setMaxTimerDrivenThreads(int number) - { - _maxTimerDrivenThreads = number; - } - //! Get MAX TimerDrivenThreads - int getMaxTimerDrivenThreads() - { - return _maxTimerDrivenThreads; - } - //! Set MAX EventDrivenThreads - void setMaxEventDrivenThreads(int number) - { - _maxEventDrivenThreads = number; - } - //! Get MAX EventDrivenThreads - int getMaxEventDrivenThreads() - { - return _maxEventDrivenThreads; - } - //! Create FlowFile Repository - bool createFlowFileRepository(); - //! Create Content Repository - bool createContentRepository(); - - //! Life Cycle related function - //! Load flow xml from disk, after that, create the root process group and its children, initialize the flows - void load(ConfigFormat format); - //! Whether the Flow Controller is start running - bool isRunning(); - //! Whether the Flow Controller has already been initialized (loaded flow XML) - bool isInitialized(); - //! Start to run the Flow Controller which internally start the root process group and all its children - bool start(); - //! Stop to run the Flow Controller which internally stop the root process group and all its children - void stop(bool force); - //! Unload the current flow xml, clean the root process group and all its children - void unload(); - //! Load new xml - void reload(std::string xmlFile); - //! update property value - void updatePropertyValue(std::string processorName, std::string propertyName, std::string propertyValue) - { - if (_root) - _root->updatePropertyValue(processorName, propertyName, propertyValue); - } - - //! Create Processor (Node/Input/Output Port) based on the name - Processor *createProcessor(std::string name, uuid_t uuid); - //! Create Root Processor Group - ProcessGroup *createRootProcessGroup(std::string name, uuid_t uuid); - //! Create Remote Processor Group - ProcessGroup *createRemoteProcessGroup(std::string name, uuid_t uuid); - //! Create Connection - Connection *createConnection(std::string name, uuid_t uuid); - //! set 8 bytes SerialNumber - void setSerialNumber(uint8_t *number) - { - _protocol->setSerialNumber(number); - } - -protected: - - //! A global unique identifier - uuid_t _uuid; - //! FlowController Name - std::string _name; - //! Configuration File Name - std::string _configurationFileName; - //! NiFi property File Name - std::string _propertiesFileName; - //! Root Process Group - ProcessGroup *_root; - //! MAX Timer Driven Threads - int _maxTimerDrivenThreads; - //! MAX Event Driven Threads - int _maxEventDrivenThreads; - //! Config - //! FlowFile Repo - //! Provenance Repo - //! Flow Engines - //! Flow Scheduler - TimerDrivenSchedulingAgent _timerScheduler; - //! Controller Service - //! Config - //! Site to Site Server Listener - //! Heart Beat - //! FlowControl Protocol - FlowControlProtocol *_protocol; - -private: - - //! Mutex for protection - std::mutex _mtx; - //! Logger - Logger *_logger; - //! Configure - Configure *_configure; - //! Whether it is running - std::atomic<bool> _running; - //! Whether it has already been initialized (load the flow XML already) - std::atomic<bool> _initialized; - //! Process Processor Node XML - void parseProcessorNode(xmlDoc *doc, xmlNode *processorNode, ProcessGroup *parent); - //! Process Port XML - void parsePort(xmlDoc *doc, xmlNode *processorNode, ProcessGroup *parent, TransferDirection direction); - //! Process Root Processor Group XML - void parseRootProcessGroup(xmlDoc *doc, xmlNode *node); - //! Process Property XML - void parseProcessorProperty(xmlDoc *doc, xmlNode *node, Processor *processor); - //! Process connection XML - void parseConnection(xmlDoc *doc, xmlNode *node, ProcessGroup *parent); - //! Process Remote Process Group - void parseRemoteProcessGroup(xmlDoc *doc, xmlNode *node, ProcessGroup *parent); - - //! Process Processor Node YAML - void parseProcessorNodeYaml(YAML::Node processorNode, ProcessGroup *parent); - //! Process Port YAML - void parsePortYaml(YAML::Node *portNode, ProcessGroup *parent, TransferDirection direction); - //! Process Root Processor Group YAML - void parseRootProcessGroupYaml(YAML::Node rootNode); - //! Process Property YAML - void parseProcessorPropertyYaml(YAML::Node *doc, YAML::Node *node, Processor *processor); - //! Process connection YAML - void parseConnectionYaml(YAML::Node *node, ProcessGroup *parent); - //! Process Remote Process Group YAML - void parseRemoteProcessGroupYaml(YAML::Node *node, ProcessGroup *parent); - //! Parse Properties Node YAML for a processor - void parsePropertiesNodeYaml(YAML::Node *propertiesNode, Processor *processor); - - // Prevent default copy constructor and assignment operation - // Only support pass by reference or pointer - FlowController(const FlowController &parent); - FlowController &operator=(const FlowController &parent); - -}; - -#endif http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b02af540/inc/FlowFileRecord.h ---------------------------------------------------------------------- diff --git a/inc/FlowFileRecord.h b/inc/FlowFileRecord.h deleted file mode 100644 index 8b7362f..0000000 --- a/inc/FlowFileRecord.h +++ /dev/null @@ -1,220 +0,0 @@ -/** - * @file FlowFileRecord.h - * Flow file record class declaration - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#ifndef __FLOW_FILE_RECORD_H__ -#define __FLOW_FILE_RECORD_H__ - -#include <uuid/uuid.h> -#include <vector> -#include <queue> -#include <map> -#include <mutex> -#include <atomic> -#include <iostream> -#include <sstream> -#include <fstream> -#include <set> - -#include "TimeUtil.h" -#include "Logger.h" -#include "ResourceClaim.h" - -class ProcessSession; -class Connection; - -#define DEFAULT_FLOWFILE_PATH "." - -//! FlowFile Attribute -enum FlowAttribute -{ - //! The flowfile's path indicates the relative directory to which a FlowFile belongs and does not contain the filename - PATH = 0, - //! The flowfile's absolute path indicates the absolute directory to which a FlowFile belongs and does not contain the filename - ABSOLUTE_PATH, - //! The filename of the FlowFile. The filename should not contain any directory structure. - FILENAME, - //! A unique UUID assigned to this FlowFile. - UUID, - //! A numeric value indicating the FlowFile priority - priority, - //! The MIME Type of this FlowFile - MIME_TYPE, - //! Specifies the reason that a FlowFile is being discarded - DISCARD_REASON, - //! Indicates an identifier other than the FlowFile's UUID that is known to refer to this FlowFile. - ALTERNATE_IDENTIFIER, - MAX_FLOW_ATTRIBUTES -}; - -//! FlowFile Attribute Key -static const char *FlowAttributeKeyArray[MAX_FLOW_ATTRIBUTES] = -{ - "path", - "absolute.path", - "filename", - "uuid", - "priority", - "mime.type", - "discard.reason", - "alternate.identifier" -}; - -//! FlowFile Attribute Enum to Key -inline const char *FlowAttributeKey(FlowAttribute attribute) -{ - if (attribute < MAX_FLOW_ATTRIBUTES) - return FlowAttributeKeyArray[attribute]; - else - return NULL; -} - -//! FlowFile IO Callback functions for input and output -//! throw exception for error -class InputStreamCallback -{ -public: - virtual void process(std::ifstream *stream) = 0; -}; -class OutputStreamCallback -{ -public: - virtual void process(std::ofstream *stream) = 0; -}; - - -//! FlowFile Record Class -class FlowFileRecord -{ - friend class ProcessSession; -public: - //! Constructor - /*! - * Create a new flow record - */ - FlowFileRecord(std::map<std::string, std::string> attributes, ResourceClaim *claim = NULL); - //! Destructor - virtual ~FlowFileRecord(); - //! addAttribute key is enum - bool addAttribute(FlowAttribute key, std::string value); - //! addAttribute key is string - bool addAttribute(std::string key, std::string value); - //! removeAttribute key is enum - bool removeAttribute(FlowAttribute key); - //! removeAttribute key is string - bool removeAttribute(std::string key); - //! updateAttribute key is enum - bool updateAttribute(FlowAttribute key, std::string value); - //! updateAttribute key is string - bool updateAttribute(std::string key, std::string value); - //! getAttribute key is enum - bool getAttribute(FlowAttribute key, std::string &value); - //! getAttribute key is string - bool getAttribute(std::string key, std::string &value); - //! setAttribute, if attribute already there, update it, else, add it - void setAttribute(std::string key, std::string value) { - _attributes[key] = value; - } - //! Get the UUID as string - std::string getUUIDStr() { - return _uuidStr; - } - //! Get Attributes - std::map<std::string, std::string> getAttributes() { - return _attributes; - } - //! Check whether it is still being penalized - bool isPenalized() { - return (_penaltyExpirationMs > 0 ? _penaltyExpirationMs > getTimeMillis() : false); - } - //! Get Size - uint64_t getSize() { - return _size; - } - // ! Get Offset - uint64_t getOffset() { - return _offset; - } - // ! Get Entry Date - uint64_t getEntryDate() { - return _entryDate; - } - // ! Get Lineage Start Date - uint64_t getlineageStartDate() { - return _lineageStartDate; - } - // ! Set Original connection - void setOriginalConnection (Connection *connection) { - _orginalConnection = connection; - } - //! Get Resource Claim - ResourceClaim *getResourceClaim() { - return _claim; - } - -protected: - - //! Date at which the flow file entered the flow - uint64_t _entryDate; - //! Date at which the origin of this flow file entered the flow - uint64_t _lineageStartDate; - //! Date at which the flow file was queued - uint64_t _lastQueueDate; - //! Size in bytes of the data corresponding to this flow file - uint64_t _size; - //! A global unique identifier - uuid_t _uuid; - //! A local unique identifier - uint64_t _id; - //! Offset to the content - uint64_t _offset; - //! Penalty expiration - uint64_t _penaltyExpirationMs; - //! Attributes key/values pairs for the flow record - std::map<std::string, std::string> _attributes; - //! Pointer to the associated content resource claim - ResourceClaim *_claim; - //! UUID string - std::string _uuidStr; - //! UUID string for all parents - std::set<std::string> _lineageIdentifiers; - //! duplicate the original flow file - void duplicate(FlowFileRecord *original); - -private: - - //! Local flow sequence ID - static std::atomic<uint64_t> _localFlowSeqNumber; - //! Mark for deletion - bool _markedDelete; - //! Connection queue that this flow file will be transfer or current in - Connection *_connection; - //! Orginal connection queue that this flow file was dequeued from - Connection *_orginalConnection; - //! Logger - Logger *_logger; - //! Snapshot flow record for session rollback - bool _snapshot; - // Prevent default copy constructor and assignment operation - // Only support pass by reference or pointer - FlowFileRecord(const FlowFileRecord &parent); - FlowFileRecord &operator=(const FlowFileRecord &parent); - -}; - -#endif http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b02af540/inc/GenerateFlowFile.h ---------------------------------------------------------------------- diff --git a/inc/GenerateFlowFile.h b/inc/GenerateFlowFile.h deleted file mode 100644 index 27aa43b..0000000 --- a/inc/GenerateFlowFile.h +++ /dev/null @@ -1,87 +0,0 @@ -/** - * @file GenerateFlowFile.h - * GenerateFlowFile class declaration - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#ifndef __GENERATE_FLOW_FILE_H__ -#define __GENERATE_FLOW_FILE_H__ - -#include "FlowFileRecord.h" -#include "Processor.h" -#include "ProcessSession.h" - -//! GenerateFlowFile Class -class GenerateFlowFile : public Processor -{ -public: - //! Constructor - /*! - * Create a new processor - */ - GenerateFlowFile(std::string name, uuid_t uuid = NULL) - : Processor(name, uuid) - { - _data = NULL; - _dataSize = 0; - } - //! Destructor - virtual ~GenerateFlowFile() - { - if (_data) - delete[] _data; - } - //! Processor Name - static const std::string ProcessorName; - //! Supported Properties - static Property FileSize; - static Property BatchSize; - static Property DataFormat; - static Property UniqueFlowFiles; - static const char *DATA_FORMAT_BINARY; - static const char *DATA_FORMAT_TEXT; - //! Supported Relationships - static Relationship Success; - //! Nest Callback Class for write stream - class WriteCallback : public OutputStreamCallback - { - public: - WriteCallback(char *data, uint64_t size) - : _data(data), _dataSize(size) {} - char *_data; - uint64_t _dataSize; - void process(std::ofstream *stream) { - if (_data && _dataSize > 0) - stream->write(_data, _dataSize); - } - }; - -public: - //! OnTrigger method, implemented by NiFi GenerateFlowFile - virtual void onTrigger(ProcessContext *context, ProcessSession *session); - //! Initialize, over write by NiFi GenerateFlowFile - virtual void initialize(void); - -protected: - -private: - //! Generated data - char * _data; - //! Size of the generate data - uint64_t _dataSize; -}; - -#endif http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b02af540/inc/GetFile.h ---------------------------------------------------------------------- diff --git a/inc/GetFile.h b/inc/GetFile.h deleted file mode 100644 index eb975fd..0000000 --- a/inc/GetFile.h +++ /dev/null @@ -1,117 +0,0 @@ -/** - * @file GetFile.h - * GetFile class declaration - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#ifndef __GET_FILE_H__ -#define __GET_FILE_H__ - -#include "FlowFileRecord.h" -#include "Processor.h" -#include "ProcessSession.h" - -//! GetFile Class -class GetFile : public Processor -{ -public: - //! Constructor - /*! - * Create a new processor - */ - GetFile(std::string name, uuid_t uuid = NULL) - : Processor(name, uuid) - { - _logger = Logger::getLogger(); - _directory = "."; - _recursive = true; - _keepSourceFile = false; - _minAge = 0; - _maxAge = 0; - _minSize = 0; - _maxSize = 0; - _ignoreHiddenFile = true; - _pollInterval = 0; - _batchSize = 10; - _lastDirectoryListingTime = getTimeMillis(); - _fileFilter = "[^\\.].*"; - } - //! Destructor - virtual ~GetFile() - { - } - //! Processor Name - static const std::string ProcessorName; - //! Supported Properties - static Property Directory; - static Property Recurse; - static Property KeepSourceFile; - static Property MinAge; - static Property MaxAge; - static Property MinSize; - static Property MaxSize; - static Property IgnoreHiddenFile; - static Property PollInterval; - static Property BatchSize; - static Property FileFilter; - //! Supported Relationships - static Relationship Success; - -public: - //! OnTrigger method, implemented by NiFi GetFile - virtual void onTrigger(ProcessContext *context, ProcessSession *session); - //! Initialize, over write by NiFi GetFile - virtual void initialize(void); - //! perform directory listing - void performListing(std::string dir); - -protected: - -private: - //! Logger - Logger *_logger; - //! Queue for store directory list - std::queue<std::string> _dirList; - //! Get Listing size - uint64_t getListingSize() { - std::lock_guard<std::mutex> lock(_mtx); - return _dirList.size(); - } - //! Whether the directory listing is empty - bool isListingEmpty(); - //! Put full path file name into directory listing - void putListing(std::string fileName); - //! Poll directory listing for files - void pollListing(std::queue<std::string> &list, int maxSize); - //! Check whether file can be added to the directory listing - bool acceptFile(std::string fileName); - //! Mutex for protection of the directory listing - std::mutex _mtx; - std::string _directory; - bool _recursive; - bool _keepSourceFile; - int64_t _minAge; - int64_t _maxAge; - int64_t _minSize; - int64_t _maxSize; - bool _ignoreHiddenFile; - int64_t _pollInterval; - int64_t _batchSize; - uint64_t _lastDirectoryListingTime; - std::string _fileFilter; -}; - -#endif http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b02af540/inc/ListenSyslog.h ---------------------------------------------------------------------- diff --git a/inc/ListenSyslog.h b/inc/ListenSyslog.h deleted file mode 100644 index 81bc92c..0000000 --- a/inc/ListenSyslog.h +++ /dev/null @@ -1,209 +0,0 @@ -/** - * @file ListenSyslog.h - * ListenSyslog class declaration - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#ifndef __LISTEN_SYSLOG_H__ -#define __LISTEN_SYSLOG_H__ - -#include <stdio.h> -#include <unistd.h> -#include <sys/types.h> -#include <sys/socket.h> -#include <netinet/in.h> -#include <arpa/inet.h> -#include <errno.h> -#include <sys/select.h> -#include <sys/time.h> -#include <sys/types.h> -#include <chrono> -#include <thread> -#include "FlowFileRecord.h" -#include "Processor.h" -#include "ProcessSession.h" - -//! SyslogEvent -typedef struct { - uint8_t *payload; - uint64_t len; -} SysLogEvent; - -//! ListenSyslog Class -class ListenSyslog : public Processor -{ -public: - //! Constructor - /*! - * Create a new processor - */ - ListenSyslog(std::string name, uuid_t uuid = NULL) - : Processor(name, uuid) - { - _logger = Logger::getLogger(); - _eventQueueByteSize = 0; - _serverSocket = 0; - _recvBufSize = 65507; - _maxSocketBufSize = 1024*1024; - _maxConnections = 2; - _maxBatchSize = 1; - _messageDelimiter = "\n"; - _protocol = "UDP"; - _port = 514; - _parseMessages = false; - _serverSocket = 0; - _maxFds = 0; - FD_ZERO(&_readfds); - _thread = NULL; - _resetServerSocket = false; - _serverTheadRunning = false; - } - //! Destructor - virtual ~ListenSyslog() - { - _serverTheadRunning = false; - if (this->_thread) - delete this->_thread; - // need to reset the socket - std::vector<int>::iterator it; - for (it = _clientSockets.begin(); it != _clientSockets.end(); ++it) - { - int clientSocket = *it; - close(clientSocket); - } - _clientSockets.clear(); - if (_serverSocket > 0) - { - _logger->log_info("ListenSysLog Server socket %d close", _serverSocket); - close(_serverSocket); - _serverSocket = 0; - } - } - //! Processor Name - static const std::string ProcessorName; - //! Supported Properties - static Property RecvBufSize; - static Property MaxSocketBufSize; - static Property MaxConnections; - static Property MaxBatchSize; - static Property MessageDelimiter; - static Property ParseMessages; - static Property Protocol; - static Property Port; - //! Supported Relationships - static Relationship Success; - static Relationship Invalid; - //! Nest Callback Class for write stream - class WriteCallback : public OutputStreamCallback - { - public: - WriteCallback(char *data, uint64_t size) - : _data(data), _dataSize(size) {} - char *_data; - uint64_t _dataSize; - void process(std::ofstream *stream) { - if (_data && _dataSize > 0) - stream->write(_data, _dataSize); - } - }; - -public: - //! OnTrigger method, implemented by NiFi ListenSyslog - virtual void onTrigger(ProcessContext *context, ProcessSession *session); - //! Initialize, over write by NiFi ListenSyslog - virtual void initialize(void); - -protected: - -private: - //! Logger - Logger *_logger; - //! Run function for the thread - static void run(ListenSyslog *process); - //! Run Thread - void runThread(); - //! Queue for store syslog event - std::queue<SysLogEvent> _eventQueue; - //! Size of Event queue in bytes - uint64_t _eventQueueByteSize; - //! Get event queue size - uint64_t getEventQueueSize() { - std::lock_guard<std::mutex> lock(_mtx); - return _eventQueue.size(); - } - //! Get event queue byte size - uint64_t getEventQueueByteSize() { - std::lock_guard<std::mutex> lock(_mtx); - return _eventQueueByteSize; - } - //! Whether the event queue is empty - bool isEventQueueEmpty() - { - std::lock_guard<std::mutex> lock(_mtx); - return _eventQueue.empty(); - } - //! Put event into directory listing - void putEvent(uint8_t *payload, uint64_t len) - { - std::lock_guard<std::mutex> lock(_mtx); - SysLogEvent event; - event.payload = payload; - event.len = len; - _eventQueue.push(event); - _eventQueueByteSize += len; - } - //! Read \n terminated line from TCP socket - int readline( int fd, char *bufptr, size_t len ); - //! start server socket and handling client socket - void startSocketThread(); - //! Poll event - void pollEvent(std::queue<SysLogEvent> &list, int maxSize) - { - std::lock_guard<std::mutex> lock(_mtx); - - while (!_eventQueue.empty() && (maxSize == 0 || list.size() < maxSize)) - { - SysLogEvent event = _eventQueue.front(); - _eventQueue.pop(); - _eventQueueByteSize -= event.len; - list.push(event); - } - return; - } - //! Mutex for protection of the directory listing - std::mutex _mtx; - int64_t _recvBufSize; - int64_t _maxSocketBufSize; - int64_t _maxConnections; - int64_t _maxBatchSize; - std::string _messageDelimiter; - std::string _protocol; - int64_t _port; - bool _parseMessages; - int _serverSocket; - std::vector<int> _clientSockets; - int _maxFds; - fd_set _readfds; - //! thread - std::thread *_thread; - //! whether to reset the server socket - bool _resetServerSocket; - bool _serverTheadRunning; - //! buffer for read socket - uint8_t _buffer[2048]; -}; - -#endif http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b02af540/inc/LogAttribute.h ---------------------------------------------------------------------- diff --git a/inc/LogAttribute.h b/inc/LogAttribute.h deleted file mode 100644 index 125ebf3..0000000 --- a/inc/LogAttribute.h +++ /dev/null @@ -1,128 +0,0 @@ -/** - * @file LogAttribute.h - * LogAttribute class declaration - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#ifndef __LOG_ATTRIBUTE_H__ -#define __LOG_ATTRIBUTE_H__ - -#include "FlowFileRecord.h" -#include "Processor.h" -#include "ProcessSession.h" - -//! LogAttribute Class -class LogAttribute : public Processor -{ -public: - //! Constructor - /*! - * Create a new processor - */ - LogAttribute(std::string name, uuid_t uuid = NULL) - : Processor(name, uuid) - { - _logger = Logger::getLogger(); - } - //! Destructor - virtual ~LogAttribute() - { - } - //! Processor Name - static const std::string ProcessorName; - //! Supported Properties - static Property LogLevel; - static Property AttributesToLog; - static Property AttributesToIgnore; - static Property LogPayload; - static Property LogPrefix; - //! Supported Relationships - static Relationship Success; - enum LogAttrLevel { - LogAttrLevelTrace, LogAttrLevelDebug, LogAttrLevelInfo, LogAttrLevelWarn, LogAttrLevelError - }; - //! Convert log level from string to enum - bool logLevelStringToEnum(std::string logStr, LogAttrLevel &level) - { - if (logStr == "trace") - { - level = LogAttrLevelTrace; - return true; - } - else if (logStr == "debug") - { - level = LogAttrLevelDebug; - return true; - } - else if (logStr == "info") - { - level = LogAttrLevelInfo; - return true; - } - else if (logStr == "warn") - { - level = LogAttrLevelWarn; - return true; - } - else if (logStr == "error") - { - level = LogAttrLevelError; - return true; - } - else - return false; - } - //! Nest Callback Class for read stream - class ReadCallback : public InputStreamCallback - { - public: - ReadCallback(uint64_t size) - { - _bufferSize = size; - _buffer = new char[_bufferSize]; - } - ~ReadCallback() - { - if (_buffer) - delete[] _buffer; - } - void process(std::ifstream *stream) { - - stream->read(_buffer, _bufferSize); - if (!stream) - _readSize = stream->gcount(); - else - _readSize = _bufferSize; - } - char *_buffer; - uint64_t _bufferSize; - uint64_t _readSize; - }; - -public: - //! OnTrigger method, implemented by NiFi LogAttribute - virtual void onTrigger(ProcessContext *context, ProcessSession *session); - //! Initialize, over write by NiFi LogAttribute - virtual void initialize(void); - -protected: - -private: - //! Logger - Logger *_logger; -}; - -#endif http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b02af540/inc/Logger.h ---------------------------------------------------------------------- diff --git a/inc/Logger.h b/inc/Logger.h deleted file mode 100644 index 3edad9d..0000000 --- a/inc/Logger.h +++ /dev/null @@ -1,154 +0,0 @@ -/** - * @file Logger.h - * Logger class declaration - * This is a C++ wrapper for spdlog, a lightweight C++ logging library - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#ifndef __LOGGER_H__ -#define __LOGGER_H__ - -#include "spdlog/spdlog.h" - -using spdlog::stdout_logger_mt; -using spdlog::rotating_logger_mt; -using spdlog::logger; - -#define LOG_BUFFER_SIZE 1024 -#define FILL_BUFFER char buffer[LOG_BUFFER_SIZE]; \ - va_list args; \ - va_start(args, format); \ - vsnprintf(buffer, LOG_BUFFER_SIZE,format, args); \ - va_end(args); - -//! 5M default log file size -#define DEFAULT_LOG_FILE_SIZE (5*1024*1024) -//! 3 log files rotation -#define DEFAULT_LOG_FILE_NUMBER 3 -#define LOG_NAME "minifi log" -#define LOG_FILE_NAME "minifi-app.log" - -typedef enum -{ - trace = 0, - debug = 1, - info = 2, - notice = 3, - warn = 4, - err = 5, - critical = 6, - alert = 7, - emerg = 8, - off = 9 -} LOG_LEVEL_E; - -//! Logger Class -class Logger { - -public: - - //! Get the singleton logger instance - static Logger * getLogger() { - if (!_logger) - _logger = new Logger(); - return _logger; - } - void setLogLevel(LOG_LEVEL_E level) { - if (_spdlog == NULL) - return; - _spdlog->set_level((spdlog::level::level_enum) level); - } - //! Destructor - ~Logger() {} - /** - * @brief Log error message - * @param format format string ('man printf' for syntax) - * @warning does not check @p log or @p format for null. Caller must ensure parameters and format string lengths match - */ - void log_error(const char *const format, ...) { - if(_spdlog == NULL) - return; - FILL_BUFFER - _spdlog->error(buffer); - } - /** - * @brief Log warn message - * @param format format string ('man printf' for syntax) - * @warning does not check @p log or @p format for null. Caller must ensure parameters and format string lengths match - */ - void log_warn(const char *const format, ...) { - if(_spdlog == NULL) - return; - FILL_BUFFER - _spdlog->warn(buffer); - } - /** - * @brief Log info message - * @param format format string ('man printf' for syntax) - * @warning does not check @p log or @p format for null. Caller must ensure parameters and format string lengths match - */ - void log_info(const char *const format, ...) { - if(_spdlog == NULL) - return; - FILL_BUFFER - _spdlog->info(buffer); - } - /** - * @brief Log debug message - * @param format format string ('man printf' for syntax) - * @warning does not check @p log or @p format for null. Caller must ensure parameters and format string lengths match - */ - void log_debug(const char *const format, ...) { - if(_spdlog == NULL) - return; - FILL_BUFFER - _spdlog->debug(buffer); - } - /** - * @brief Log trace message - * @param format format string ('man printf' for syntax) - * @warning does not check @p log or @p format for null. Caller must ensure parameters and format string lengths match - */ - void log_trace(const char *const format, ...) { - if(_spdlog == NULL) - return; - FILL_BUFFER - _spdlog->trace(buffer); - } - -protected: - -private: - // Prevent default copy constructor and assignment operation - // Only support pass by reference or pointer - Logger(const Logger &parent); - Logger &operator=(const Logger &parent); - //! Constructor - /*! - * Create a logger - * */ - Logger(const std::string logger_name = LOG_NAME, const std::string filename = LOG_FILE_NAME, size_t max_file_size = DEFAULT_LOG_FILE_SIZE, size_t max_files = DEFAULT_LOG_FILE_NUMBER, bool force_flush = true) { - _spdlog = rotating_logger_mt(logger_name, filename, max_file_size, max_files, force_flush); - _spdlog->set_level((spdlog::level::level_enum) debug); - } - //! spdlog - std::shared_ptr<logger> _spdlog; - - //! Singleton logger instance - static Logger *_logger; -}; - -#endif http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b02af540/inc/ProcessContext.h ---------------------------------------------------------------------- diff --git a/inc/ProcessContext.h b/inc/ProcessContext.h deleted file mode 100644 index 2a88b93..0000000 --- a/inc/ProcessContext.h +++ /dev/null @@ -1,99 +0,0 @@ -/** - * @file ProcessContext.h - * ProcessContext class declaration - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#ifndef __PROCESS_CONTEXT_H__ -#define __PROCESS_CONTEXT_H__ - -#include <uuid/uuid.h> -#include <vector> -#include <queue> -#include <map> -#include <mutex> -#include <atomic> -#include <algorithm> - -#include "Logger.h" -#include "Processor.h" - -//! ProcessContext Class -class ProcessContext -{ -public: - //! Constructor - /*! - * Create a new process context associated with the processor/controller service/state manager - */ - ProcessContext(Processor *processor = NULL) : _processor(processor) { - _logger = Logger::getLogger(); - } - //! Destructor - virtual ~ProcessContext() {} - //! Get Processor associated with the Process Context - Processor *getProcessor() { - return _processor; - } - bool getProperty(std::string name, std::string &value) { - if (_processor) - return _processor->getProperty(name, value); - else - return false; - } - //! Whether the relationship is supported - bool isSupportedRelationship(Relationship relationship) { - if (_processor) - return _processor->isSupportedRelationship(relationship); - else - return false; - } - //! Check whether the relationship is auto terminated - bool isAutoTerminated(Relationship relationship) { - if (_processor) - return _processor->isAutoTerminated(relationship); - else - return false; - } - //! Get ProcessContext Maximum Concurrent Tasks - uint8_t getMaxConcurrentTasks(void) { - if (_processor) - return _processor->getMaxConcurrentTasks(); - else - return 0; - } - //! Yield based on the yield period - void yield() { - if (_processor) - _processor->yield(); - } - -protected: - -private: - - //! Processor - Processor *_processor; - // Prevent default copy constructor and assignment operation - // Only support pass by reference or pointer - ProcessContext(const ProcessContext &parent); - ProcessContext &operator=(const ProcessContext &parent); - //! Logger - Logger *_logger; - -}; - -#endif