Repository: nifi-minifi-cpp Updated Branches: refs/heads/master fd280b5c4 -> 5c252277d
MINIFICPP-31 Added UpdateAttribute processor This closes #262. Signed-off-by: Marc Parisi <phroc...@apache.org> Project: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/commit/5c252277 Tree: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/tree/5c252277 Diff: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/diff/5c252277 Branch: refs/heads/master Commit: 5c252277d2e1cf3c6eea1aef2247c7860b39862b Parents: fd280b5 Author: Andrew I. Christianson <a...@andyic.org> Authored: Thu Feb 8 13:51:08 2018 -0500 Committer: Marc Parisi <phroc...@apache.org> Committed: Mon Feb 12 13:16:51 2018 -0500 ---------------------------------------------------------------------- .../expression-language/ProcessContextExpr.cpp | 6 +- libminifi/include/core/ConfigurableComponent.h | 2 +- libminifi/include/core/ProcessContext.h | 3 +- libminifi/include/core/ProcessorNode.h | 47 ++++++++++ libminifi/include/processors/UpdateAttribute.h | 78 +++++++++++++++++ libminifi/src/core/ConfigurableComponent.cpp | 2 +- libminifi/src/processors/UpdateAttribute.cpp | 92 ++++++++++++++++++++ libminifi/test/TestBase.cpp | 15 ++-- libminifi/test/TestBase.h | 5 +- libminifi/test/unit/DynamicPropertyTests.cpp | 2 +- libminifi/test/unit/UpdateAttributeTests.cpp | 55 ++++++++++++ 11 files changed, 293 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/5c252277/extensions/expression-language/ProcessContextExpr.cpp ---------------------------------------------------------------------- diff --git a/extensions/expression-language/ProcessContextExpr.cpp b/extensions/expression-language/ProcessContextExpr.cpp index ef4bc4b..c52b18c 100644 --- a/extensions/expression-language/ProcessContextExpr.cpp +++ b/extensions/expression-language/ProcessContextExpr.cpp @@ -38,14 +38,14 @@ bool ProcessContext::getProperty(const std::string &name, std::string &value, bool ProcessContext::getDynamicProperty(const std::string &name, std::string &value, const std::shared_ptr<FlowFile> &flow_file) { - if (expressions_.find(name) == expressions_.end()) { + if (dynamic_property_expressions_.find(name) == dynamic_property_expressions_.end()) { std::string expression_str; getDynamicProperty(name, expression_str); logger_->log_debug("Compiling expression for %s/%s: %s", getProcessorNode()->getName(), name, expression_str); - expressions_.emplace(name, expression::compile(expression_str)); + dynamic_property_expressions_.emplace(name, expression::compile(expression_str)); } - value = expressions_[name]({flow_file}); + value = dynamic_property_expressions_[name]({flow_file}); return true; } http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/5c252277/libminifi/include/core/ConfigurableComponent.h ---------------------------------------------------------------------- diff --git a/libminifi/include/core/ConfigurableComponent.h b/libminifi/include/core/ConfigurableComponent.h index fef78e9..22ac434 100644 --- a/libminifi/include/core/ConfigurableComponent.h +++ b/libminifi/include/core/ConfigurableComponent.h @@ -131,7 +131,7 @@ class __attribute__((visibility("default"))) ConfigurableComponent { * * @return vector of property keys */ - std::vector<std::string> getDynamicProperyKeys(); + std::vector<std::string> getDynamicPropertyKeys(); virtual ~ConfigurableComponent(); http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/5c252277/libminifi/include/core/ProcessContext.h ---------------------------------------------------------------------- diff --git a/libminifi/include/core/ProcessContext.h b/libminifi/include/core/ProcessContext.h index e67e412..e7f70b7 100644 --- a/libminifi/include/core/ProcessContext.h +++ b/libminifi/include/core/ProcessContext.h @@ -75,7 +75,7 @@ class ProcessContext : public controller::ControllerServiceLookup { } bool getDynamicProperty(const std::string &name, std::string &value, const std::shared_ptr<FlowFile> &flow_file); std::vector<std::string> getDynamicPropertyKeys() { - return processor_node_->getDynamicProperyKeys(); + return processor_node_->getDynamicPropertyKeys(); } // Sets the property value using the property's string name bool setProperty(const std::string &name, std::string value) { @@ -185,6 +185,7 @@ class ProcessContext : public controller::ControllerServiceLookup { std::shared_ptr<ProcessorNode> processor_node_; std::map<std::string, org::apache::nifi::minifi::expression::Expression> expressions_; + std::map<std::string, org::apache::nifi::minifi::expression::Expression> dynamic_property_expressions_; // Logger std::shared_ptr<logging::Logger> logger_; http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/5c252277/libminifi/include/core/ProcessorNode.h ---------------------------------------------------------------------- diff --git a/libminifi/include/core/ProcessorNode.h b/libminifi/include/core/ProcessorNode.h index 4b99a71..ed44d6a 100644 --- a/libminifi/include/core/ProcessorNode.h +++ b/libminifi/include/core/ProcessorNode.h @@ -86,6 +86,53 @@ class ProcessorNode : public ConfigurableComponent, public Connectable { } /** + * Get dynamic property using the provided name. + * @param name property name. + * @param value value passed in by reference + * @return result of getting property. + */ + bool getDynamicProperty(const std::string name, std::string &value) { + const auto &processor_cast = std::dynamic_pointer_cast<ConfigurableComponent>(processor_); + if (processor_cast) { + return processor_cast->getDynamicProperty(name, value); + } else { + return ConfigurableComponent::getDynamicProperty(name, value); + } + } + + /** + * Sets the dynamic property using the provided name + * @param property name + * @param value property value. + * @return result of setting property. + */ + bool setDynamicProperty(const std::string name, std::string value) { + const auto &processor_cast = std::dynamic_pointer_cast<ConfigurableComponent>(processor_); + auto ret = ConfigurableComponent::setDynamicProperty(name, value); + + if (processor_cast) { + ret = processor_cast->setDynamicProperty(name, value); + } + + return ret; + } + + /** + * Gets list of dynamic property keys + * @param name property name. + * @param value value passed in by reference + * @return result of getting property. + */ + std::vector<std::string> getDynamicPropertyKeys() { + const auto &processor_cast = std::dynamic_pointer_cast<ConfigurableComponent>(processor_); + if (processor_cast) { + return processor_cast->getDynamicPropertyKeys(); + } else { + return ConfigurableComponent::getDynamicPropertyKeys(); + } + } + + /** * Sets the property using the provided name * @param property name * @param value property value. http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/5c252277/libminifi/include/processors/UpdateAttribute.h ---------------------------------------------------------------------- diff --git a/libminifi/include/processors/UpdateAttribute.h b/libminifi/include/processors/UpdateAttribute.h new file mode 100644 index 0000000..117c78a --- /dev/null +++ b/libminifi/include/processors/UpdateAttribute.h @@ -0,0 +1,78 @@ +/** + * @file UpdateAttribute.h + * UpdateAttribute class declaration + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef __UPDATE_ATTRIBUTE_H__ +#define __UPDATE_ATTRIBUTE_H__ + +#include "FlowFileRecord.h" +#include "core/Processor.h" +#include "core/ProcessSession.h" +#include "core/Core.h" +#include "core/Resource.h" +#include "core/logging/LoggerConfiguration.h" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace processors { + +class UpdateAttribute : public core::Processor { + public: + + UpdateAttribute(std::string name, uuid_t uuid = NULL) + : core::Processor(name, uuid), + logger_(logging::LoggerFactory<UpdateAttribute>::getLogger()) { + } + + /** + * Relationships + */ + + static core::Relationship Success; + static core::Relationship Failure; + + /** + * NiFi API implementation + */ + + virtual bool supportsDynamicProperties() { + return true; + }; + + virtual void onSchedule(core::ProcessContext *context, + core::ProcessSessionFactory *sessionFactory); + virtual void onTrigger(core::ProcessContext *context, + core::ProcessSession *session); + virtual void initialize(void); + + private: + std::shared_ptr<logging::Logger> logger_; + std::vector<std::string> attributes_; +}; + +REGISTER_RESOURCE(UpdateAttribute); + +} /* namespace processors */ +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */ + +#endif /* __UPDATE_ATTRIBUTE_H__ */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/5c252277/libminifi/src/core/ConfigurableComponent.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/core/ConfigurableComponent.cpp b/libminifi/src/core/ConfigurableComponent.cpp index c534c6d..92e99a3 100644 --- a/libminifi/src/core/ConfigurableComponent.cpp +++ b/libminifi/src/core/ConfigurableComponent.cpp @@ -221,7 +221,7 @@ bool ConfigurableComponent::updateDynamicProperty(const std::string &name, const } } -std::vector<std::string> ConfigurableComponent::getDynamicProperyKeys() { +std::vector<std::string> ConfigurableComponent::getDynamicPropertyKeys() { std::lock_guard<std::mutex> lock(configuration_mutex_); std::vector<std::string> result; http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/5c252277/libminifi/src/processors/UpdateAttribute.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/processors/UpdateAttribute.cpp b/libminifi/src/processors/UpdateAttribute.cpp new file mode 100644 index 0000000..f431012 --- /dev/null +++ b/libminifi/src/processors/UpdateAttribute.cpp @@ -0,0 +1,92 @@ +/** + * @file UpdateAttribute.cpp + * UpdateAttribute class implementation + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "processors/UpdateAttribute.h" + +#include <memory> +#include <string> +#include <set> + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace processors { + +core::Relationship UpdateAttribute::Success( + "success", + "All files are routed to success"); +core::Relationship UpdateAttribute::Failure( + "failure", + "Failed files are transferred to failure"); + +void UpdateAttribute::initialize() { + std::set<core::Property> properties; + setSupportedProperties(properties); + + std::set<core::Relationship> relationships; + relationships.insert(Success); + relationships.insert(Failure); + setSupportedRelationships(relationships); +} + +void UpdateAttribute::onSchedule(core::ProcessContext *context, + core::ProcessSessionFactory *sessionFactory) { + attributes_.clear(); + const auto &dynamic_prop_keys = context->getDynamicPropertyKeys(); + logger_->log_info("UpdateAttribute registering %d keys", dynamic_prop_keys.size()); + + for (const auto &key : dynamic_prop_keys) { + attributes_.emplace_back(key); + logger_->log_info("UpdateAttribute registered attribute '%s'", key); + } +} + +void UpdateAttribute::onTrigger(core::ProcessContext *context, + core::ProcessSession *session) { + auto flow_file = session->get(); + + // Do nothing if there are no incoming files + if (!flow_file) { + return; + } + + try { + for (const auto &attribute : attributes_) { + std::string value; + context->getDynamicProperty(attribute, value, flow_file); + flow_file->setAttribute(attribute, value); + logger_->log_info("Set attribute '%s' of flow file '%s' with value '%s'", + attribute, + flow_file->getUUIDStr(), value); + } + session->transfer(flow_file, Success); + } catch (const std::exception &e) { + logger_->log_error("Caught exception while updating attributes: %s", e.what()); + session->transfer(flow_file, Failure); + yield(); + } +} + +} /* namespace processors */ +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/5c252277/libminifi/test/TestBase.cpp ---------------------------------------------------------------------- diff --git a/libminifi/test/TestBase.cpp b/libminifi/test/TestBase.cpp index a384d21..950d8bb 100644 --- a/libminifi/test/TestBase.cpp +++ b/libminifi/test/TestBase.cpp @@ -17,10 +17,6 @@ */ #include "./TestBase.h" -#include <memory> -#include <vector> -#include <set> -#include <string> TestPlan::TestPlan(std::shared_ptr<core::ContentRepository> content_repo, std::shared_ptr<core::Repository> flow_repo, std::shared_ptr<core::Repository> prov_repo) : @@ -115,7 +111,10 @@ bool linkToPrevious) { return addProcessor(processor, name, relationship, linkToPrevious); } -bool TestPlan::setProperty(const std::shared_ptr<core::Processor> proc, const std::string &prop, const std::string &value) { +bool TestPlan::setProperty(const std::shared_ptr<core::Processor> proc, + const std::string &prop, + const std::string &value, + bool dynamic) { std::lock_guard<std::recursive_mutex> guard(mutex); uint32_t i = 0; logger_->log_info("Attempting to set property %s %s for %s", prop, value, proc->getName()); @@ -129,7 +128,11 @@ bool TestPlan::setProperty(const std::shared_ptr<core::Processor> proc, const st return false; } - return processor_contexts_.at(i)->setProperty(prop, value); + if (dynamic) { + return processor_contexts_.at(i)->setDynamicProperty(prop, value); + } else { + return processor_contexts_.at(i)->setProperty(prop, value); + } } void TestPlan::reset() { http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/5c252277/libminifi/test/TestBase.h ---------------------------------------------------------------------- diff --git a/libminifi/test/TestBase.h b/libminifi/test/TestBase.h index c6cebb2..77449cb 100644 --- a/libminifi/test/TestBase.h +++ b/libminifi/test/TestBase.h @@ -160,7 +160,10 @@ class TestPlan { std::shared_ptr<core::Processor> addProcessor(const std::string &processor_name, const std::string &name, core::Relationship relationship = core::Relationship("success", "description"), bool linkToPrevious = false); - bool setProperty(const std::shared_ptr<core::Processor> proc, const std::string &prop, const std::string &value); + bool setProperty(const std::shared_ptr<core::Processor> proc, + const std::string &prop, + const std::string &value, + bool dynamic = false); void reset(); http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/5c252277/libminifi/test/unit/DynamicPropertyTests.cpp ---------------------------------------------------------------------- diff --git a/libminifi/test/unit/DynamicPropertyTests.cpp b/libminifi/test/unit/DynamicPropertyTests.cpp index fbc9dfd..f4391b1 100644 --- a/libminifi/test/unit/DynamicPropertyTests.cpp +++ b/libminifi/test/unit/DynamicPropertyTests.cpp @@ -78,7 +78,7 @@ TEST_CASE("Test Set Dynamic Property 3", "[testSetDynamicProperty2]") { component.setDynamicProperty("test", "value"); component.setDynamicProperty("test2", "value2"); std::string value; - auto propertyKeys = component.getDynamicProperyKeys(); + auto propertyKeys = component.getDynamicPropertyKeys(); REQUIRE(2 == propertyKeys.size()); REQUIRE("test" == propertyKeys[0]); REQUIRE("test2" == propertyKeys[1]); http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/5c252277/libminifi/test/unit/UpdateAttributeTests.cpp ---------------------------------------------------------------------- diff --git a/libminifi/test/unit/UpdateAttributeTests.cpp b/libminifi/test/unit/UpdateAttributeTests.cpp new file mode 100644 index 0000000..df1cf21 --- /dev/null +++ b/libminifi/test/unit/UpdateAttributeTests.cpp @@ -0,0 +1,55 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "../TestBase.h" +#include "processors/LogAttribute.h" +#include "processors/UpdateAttribute.h" +#include "processors/GenerateFlowFile.h" + +TEST_CASE("UpdateAttributeTest", "[updateAttributeTest]") { + TestController testController; + + LogTestController::getInstance().setDebug<minifi::processors::UpdateAttribute>(); + LogTestController::getInstance().setDebug<TestPlan>(); + LogTestController::getInstance().setDebug<minifi::processors::LogAttribute>(); + + std::shared_ptr<TestPlan> plan = testController.createPlan(); + + const auto &generate_proc = plan->addProcessor("GenerateFlowFile", + "generate"); + const auto &update_proc = plan->addProcessor("UpdateAttribute", + "update", + core::Relationship("success", "description"), + true); + const auto &log_proc = plan->addProcessor("LogAttribute", + "log", + core::Relationship("success", "description"), + true); + + plan->setProperty(update_proc, "test_attr_1", "test_val_1", true); + plan->setProperty(update_proc, "test_attr_2", "test_val_${literal(1):plus(1)}", true); + + testController.runSession(plan, false); // generate + testController.runSession(plan, false); // update + testController.runSession(plan, false); // log + + REQUIRE(LogTestController::getInstance().contains("key:test_attr_1 value:test_val_1")); + REQUIRE(LogTestController::getInstance().contains("key:test_attr_2 value:test_val_2")); + + LogTestController::getInstance().reset(); +} +