Repository: nifi-minifi-cpp
Updated Branches:
  refs/heads/master fd280b5c4 -> 5c252277d


MINIFICPP-31 Added UpdateAttribute processor

This closes #262.

Signed-off-by: Marc Parisi <phroc...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/commit/5c252277
Tree: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/tree/5c252277
Diff: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/diff/5c252277

Branch: refs/heads/master
Commit: 5c252277d2e1cf3c6eea1aef2247c7860b39862b
Parents: fd280b5
Author: Andrew I. Christianson <a...@andyic.org>
Authored: Thu Feb 8 13:51:08 2018 -0500
Committer: Marc Parisi <phroc...@apache.org>
Committed: Mon Feb 12 13:16:51 2018 -0500

----------------------------------------------------------------------
 .../expression-language/ProcessContextExpr.cpp  |  6 +-
 libminifi/include/core/ConfigurableComponent.h  |  2 +-
 libminifi/include/core/ProcessContext.h         |  3 +-
 libminifi/include/core/ProcessorNode.h          | 47 ++++++++++
 libminifi/include/processors/UpdateAttribute.h  | 78 +++++++++++++++++
 libminifi/src/core/ConfigurableComponent.cpp    |  2 +-
 libminifi/src/processors/UpdateAttribute.cpp    | 92 ++++++++++++++++++++
 libminifi/test/TestBase.cpp                     | 15 ++--
 libminifi/test/TestBase.h                       |  5 +-
 libminifi/test/unit/DynamicPropertyTests.cpp    |  2 +-
 libminifi/test/unit/UpdateAttributeTests.cpp    | 55 ++++++++++++
 11 files changed, 293 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/5c252277/extensions/expression-language/ProcessContextExpr.cpp
----------------------------------------------------------------------
diff --git a/extensions/expression-language/ProcessContextExpr.cpp 
b/extensions/expression-language/ProcessContextExpr.cpp
index ef4bc4b..c52b18c 100644
--- a/extensions/expression-language/ProcessContextExpr.cpp
+++ b/extensions/expression-language/ProcessContextExpr.cpp
@@ -38,14 +38,14 @@ bool ProcessContext::getProperty(const std::string &name, 
std::string &value,
 
 bool ProcessContext::getDynamicProperty(const std::string &name, std::string 
&value,
                                  const std::shared_ptr<FlowFile> &flow_file) {
-  if (expressions_.find(name) == expressions_.end()) {
+  if (dynamic_property_expressions_.find(name) == 
dynamic_property_expressions_.end()) {
     std::string expression_str;
     getDynamicProperty(name, expression_str);
     logger_->log_debug("Compiling expression for %s/%s: %s", 
getProcessorNode()->getName(), name, expression_str);
-    expressions_.emplace(name, expression::compile(expression_str));
+    dynamic_property_expressions_.emplace(name, 
expression::compile(expression_str));
   }
 
-  value = expressions_[name]({flow_file});
+  value = dynamic_property_expressions_[name]({flow_file});
   return true;
 }
 

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/5c252277/libminifi/include/core/ConfigurableComponent.h
----------------------------------------------------------------------
diff --git a/libminifi/include/core/ConfigurableComponent.h 
b/libminifi/include/core/ConfigurableComponent.h
index fef78e9..22ac434 100644
--- a/libminifi/include/core/ConfigurableComponent.h
+++ b/libminifi/include/core/ConfigurableComponent.h
@@ -131,7 +131,7 @@ class __attribute__((visibility("default"))) 
ConfigurableComponent {
    *
    * @return vector of property keys
    */
-  std::vector<std::string> getDynamicProperyKeys();
+  std::vector<std::string> getDynamicPropertyKeys();
 
   virtual ~ConfigurableComponent();
 

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/5c252277/libminifi/include/core/ProcessContext.h
----------------------------------------------------------------------
diff --git a/libminifi/include/core/ProcessContext.h 
b/libminifi/include/core/ProcessContext.h
index e67e412..e7f70b7 100644
--- a/libminifi/include/core/ProcessContext.h
+++ b/libminifi/include/core/ProcessContext.h
@@ -75,7 +75,7 @@ class ProcessContext : public 
controller::ControllerServiceLookup {
   }
   bool getDynamicProperty(const std::string &name, std::string &value, const 
std::shared_ptr<FlowFile> &flow_file);
   std::vector<std::string> getDynamicPropertyKeys() {
-    return processor_node_->getDynamicProperyKeys();
+    return processor_node_->getDynamicPropertyKeys();
   }
   // Sets the property value using the property's string name
   bool setProperty(const std::string &name, std::string value) {
@@ -185,6 +185,7 @@ class ProcessContext : public 
controller::ControllerServiceLookup {
   std::shared_ptr<ProcessorNode> processor_node_;
 
   std::map<std::string, org::apache::nifi::minifi::expression::Expression> 
expressions_;
+  std::map<std::string, org::apache::nifi::minifi::expression::Expression> 
dynamic_property_expressions_;
 
   // Logger
   std::shared_ptr<logging::Logger> logger_;

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/5c252277/libminifi/include/core/ProcessorNode.h
----------------------------------------------------------------------
diff --git a/libminifi/include/core/ProcessorNode.h 
b/libminifi/include/core/ProcessorNode.h
index 4b99a71..ed44d6a 100644
--- a/libminifi/include/core/ProcessorNode.h
+++ b/libminifi/include/core/ProcessorNode.h
@@ -86,6 +86,53 @@ class ProcessorNode : public ConfigurableComponent, public 
Connectable {
   }
 
   /**
+   * Get dynamic property using the provided name.
+   * @param name property name.
+   * @param value value passed in by reference
+   * @return result of getting property.
+   */
+  bool getDynamicProperty(const std::string name, std::string &value) {
+    const auto &processor_cast = 
std::dynamic_pointer_cast<ConfigurableComponent>(processor_);
+    if (processor_cast) {
+      return processor_cast->getDynamicProperty(name, value);
+    } else {
+      return ConfigurableComponent::getDynamicProperty(name, value);
+    }
+  }
+
+  /**
+   * Sets the dynamic property using the provided name
+   * @param property name
+   * @param value property value.
+   * @return result of setting property.
+   */
+  bool setDynamicProperty(const std::string name, std::string value) {
+    const auto &processor_cast = 
std::dynamic_pointer_cast<ConfigurableComponent>(processor_);
+    auto ret = ConfigurableComponent::setDynamicProperty(name, value);
+
+    if (processor_cast) {
+      ret = processor_cast->setDynamicProperty(name, value);
+    }
+
+    return ret;
+  }
+
+  /**
+   * Gets list of dynamic property keys
+   * @param name property name.
+   * @param value value passed in by reference
+   * @return result of getting property.
+   */
+  std::vector<std::string> getDynamicPropertyKeys() {
+    const auto &processor_cast = 
std::dynamic_pointer_cast<ConfigurableComponent>(processor_);
+    if (processor_cast) {
+      return processor_cast->getDynamicPropertyKeys();
+    } else {
+      return ConfigurableComponent::getDynamicPropertyKeys();
+    }
+  }
+
+  /**
    * Sets the property using the provided name
    * @param property name
    * @param value property value.

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/5c252277/libminifi/include/processors/UpdateAttribute.h
----------------------------------------------------------------------
diff --git a/libminifi/include/processors/UpdateAttribute.h 
b/libminifi/include/processors/UpdateAttribute.h
new file mode 100644
index 0000000..117c78a
--- /dev/null
+++ b/libminifi/include/processors/UpdateAttribute.h
@@ -0,0 +1,78 @@
+/**
+ * @file UpdateAttribute.h
+ * UpdateAttribute class declaration
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef __UPDATE_ATTRIBUTE_H__
+#define __UPDATE_ATTRIBUTE_H__
+
+#include "FlowFileRecord.h"
+#include "core/Processor.h"
+#include "core/ProcessSession.h"
+#include "core/Core.h"
+#include "core/Resource.h"
+#include "core/logging/LoggerConfiguration.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+
+class UpdateAttribute : public core::Processor {
+ public:
+
+  UpdateAttribute(std::string name, uuid_t uuid = NULL)
+      : core::Processor(name, uuid),
+        logger_(logging::LoggerFactory<UpdateAttribute>::getLogger()) {
+  }
+
+  /**
+   * Relationships
+   */
+
+  static core::Relationship Success;
+  static core::Relationship Failure;
+
+  /**
+   * NiFi API implementation
+   */
+
+  virtual bool supportsDynamicProperties() {
+    return true;
+  };
+
+  virtual void onSchedule(core::ProcessContext *context,
+                          core::ProcessSessionFactory *sessionFactory);
+  virtual void onTrigger(core::ProcessContext *context,
+                         core::ProcessSession *session);
+  virtual void initialize(void);
+
+ private:
+  std::shared_ptr<logging::Logger> logger_;
+  std::vector<std::string> attributes_;
+};
+
+REGISTER_RESOURCE(UpdateAttribute);
+
+} /* namespace processors */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+
+#endif /* __UPDATE_ATTRIBUTE_H__ */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/5c252277/libminifi/src/core/ConfigurableComponent.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/core/ConfigurableComponent.cpp 
b/libminifi/src/core/ConfigurableComponent.cpp
index c534c6d..92e99a3 100644
--- a/libminifi/src/core/ConfigurableComponent.cpp
+++ b/libminifi/src/core/ConfigurableComponent.cpp
@@ -221,7 +221,7 @@ bool ConfigurableComponent::updateDynamicProperty(const 
std::string &name, const
   }
 }
 
-std::vector<std::string> ConfigurableComponent::getDynamicProperyKeys()  {
+std::vector<std::string> ConfigurableComponent::getDynamicPropertyKeys()  {
   std::lock_guard<std::mutex> lock(configuration_mutex_);
 
   std::vector<std::string> result;

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/5c252277/libminifi/src/processors/UpdateAttribute.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/processors/UpdateAttribute.cpp 
b/libminifi/src/processors/UpdateAttribute.cpp
new file mode 100644
index 0000000..f431012
--- /dev/null
+++ b/libminifi/src/processors/UpdateAttribute.cpp
@@ -0,0 +1,92 @@
+/**
+ * @file UpdateAttribute.cpp
+ * UpdateAttribute class implementation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "processors/UpdateAttribute.h"
+
+#include <memory>
+#include <string>
+#include <set>
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+
+core::Relationship UpdateAttribute::Success(
+    "success",
+    "All files are routed to success");
+core::Relationship UpdateAttribute::Failure(
+    "failure",
+    "Failed files are transferred to failure");
+
+void UpdateAttribute::initialize() {
+  std::set<core::Property> properties;
+  setSupportedProperties(properties);
+
+  std::set<core::Relationship> relationships;
+  relationships.insert(Success);
+  relationships.insert(Failure);
+  setSupportedRelationships(relationships);
+}
+
+void UpdateAttribute::onSchedule(core::ProcessContext *context,
+                                 core::ProcessSessionFactory *sessionFactory) {
+  attributes_.clear();
+  const auto &dynamic_prop_keys = context->getDynamicPropertyKeys();
+  logger_->log_info("UpdateAttribute registering %d keys", 
dynamic_prop_keys.size());
+
+  for (const auto &key : dynamic_prop_keys) {
+    attributes_.emplace_back(key);
+    logger_->log_info("UpdateAttribute registered attribute '%s'", key);
+  }
+}
+
+void UpdateAttribute::onTrigger(core::ProcessContext *context,
+                                core::ProcessSession *session) {
+  auto flow_file = session->get();
+
+  // Do nothing if there are no incoming files
+  if (!flow_file) {
+    return;
+  }
+
+  try {
+    for (const auto &attribute : attributes_) {
+      std::string value;
+      context->getDynamicProperty(attribute, value, flow_file);
+      flow_file->setAttribute(attribute, value);
+      logger_->log_info("Set attribute '%s' of flow file '%s' with value '%s'",
+                        attribute,
+                        flow_file->getUUIDStr(), value);
+    }
+    session->transfer(flow_file, Success);
+  } catch (const std::exception &e) {
+    logger_->log_error("Caught exception while updating attributes: %s", 
e.what());
+    session->transfer(flow_file, Failure);
+    yield();
+  }
+}
+
+} /* namespace processors */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/5c252277/libminifi/test/TestBase.cpp
----------------------------------------------------------------------
diff --git a/libminifi/test/TestBase.cpp b/libminifi/test/TestBase.cpp
index a384d21..950d8bb 100644
--- a/libminifi/test/TestBase.cpp
+++ b/libminifi/test/TestBase.cpp
@@ -17,10 +17,6 @@
  */
 
 #include "./TestBase.h"
-#include <memory>
-#include <vector>
-#include <set>
-#include <string>
 
 TestPlan::TestPlan(std::shared_ptr<core::ContentRepository> content_repo, 
std::shared_ptr<core::Repository> flow_repo, std::shared_ptr<core::Repository> 
prov_repo)
     :
@@ -115,7 +111,10 @@ bool linkToPrevious) {
   return addProcessor(processor, name, relationship, linkToPrevious);
 }
 
-bool TestPlan::setProperty(const std::shared_ptr<core::Processor> proc, const 
std::string &prop, const std::string &value) {
+bool TestPlan::setProperty(const std::shared_ptr<core::Processor> proc,
+                           const std::string &prop,
+                           const std::string &value,
+                           bool dynamic) {
   std::lock_guard<std::recursive_mutex> guard(mutex);
   uint32_t i = 0;
   logger_->log_info("Attempting to set property %s %s for %s", prop, value, 
proc->getName());
@@ -129,7 +128,11 @@ bool TestPlan::setProperty(const 
std::shared_ptr<core::Processor> proc, const st
     return false;
   }
 
-  return processor_contexts_.at(i)->setProperty(prop, value);
+  if (dynamic) {
+    return processor_contexts_.at(i)->setDynamicProperty(prop, value);
+  } else {
+    return processor_contexts_.at(i)->setProperty(prop, value);
+  }
 }
 
 void TestPlan::reset() {

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/5c252277/libminifi/test/TestBase.h
----------------------------------------------------------------------
diff --git a/libminifi/test/TestBase.h b/libminifi/test/TestBase.h
index c6cebb2..77449cb 100644
--- a/libminifi/test/TestBase.h
+++ b/libminifi/test/TestBase.h
@@ -160,7 +160,10 @@ class TestPlan {
   std::shared_ptr<core::Processor> addProcessor(const std::string 
&processor_name, const std::string &name, core::Relationship relationship = 
core::Relationship("success", "description"),
   bool linkToPrevious = false);
 
-  bool setProperty(const std::shared_ptr<core::Processor> proc, const 
std::string &prop, const std::string &value);
+  bool setProperty(const std::shared_ptr<core::Processor> proc,
+                   const std::string &prop,
+                   const std::string &value,
+                   bool dynamic = false);
 
   void reset();
 

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/5c252277/libminifi/test/unit/DynamicPropertyTests.cpp
----------------------------------------------------------------------
diff --git a/libminifi/test/unit/DynamicPropertyTests.cpp 
b/libminifi/test/unit/DynamicPropertyTests.cpp
index fbc9dfd..f4391b1 100644
--- a/libminifi/test/unit/DynamicPropertyTests.cpp
+++ b/libminifi/test/unit/DynamicPropertyTests.cpp
@@ -78,7 +78,7 @@ TEST_CASE("Test Set Dynamic Property 3", 
"[testSetDynamicProperty2]") {
   component.setDynamicProperty("test", "value");
   component.setDynamicProperty("test2", "value2");
   std::string value;
-  auto propertyKeys = component.getDynamicProperyKeys();
+  auto propertyKeys = component.getDynamicPropertyKeys();
   REQUIRE(2 == propertyKeys.size());
   REQUIRE("test" == propertyKeys[0]);
   REQUIRE("test2" == propertyKeys[1]);

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/5c252277/libminifi/test/unit/UpdateAttributeTests.cpp
----------------------------------------------------------------------
diff --git a/libminifi/test/unit/UpdateAttributeTests.cpp 
b/libminifi/test/unit/UpdateAttributeTests.cpp
new file mode 100644
index 0000000..df1cf21
--- /dev/null
+++ b/libminifi/test/unit/UpdateAttributeTests.cpp
@@ -0,0 +1,55 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "../TestBase.h"
+#include "processors/LogAttribute.h"
+#include "processors/UpdateAttribute.h"
+#include "processors/GenerateFlowFile.h"
+
+TEST_CASE("UpdateAttributeTest", "[updateAttributeTest]") {
+  TestController testController;
+
+  
LogTestController::getInstance().setDebug<minifi::processors::UpdateAttribute>();
+  LogTestController::getInstance().setDebug<TestPlan>();
+  
LogTestController::getInstance().setDebug<minifi::processors::LogAttribute>();
+
+  std::shared_ptr<TestPlan> plan = testController.createPlan();
+
+  const auto &generate_proc = plan->addProcessor("GenerateFlowFile",
+                                                 "generate");
+  const auto &update_proc = plan->addProcessor("UpdateAttribute",
+                                               "update",
+                                               core::Relationship("success", 
"description"),
+                                               true);
+  const auto &log_proc = plan->addProcessor("LogAttribute",
+                                            "log",
+                                            core::Relationship("success", 
"description"),
+                                            true);
+
+  plan->setProperty(update_proc, "test_attr_1", "test_val_1", true);
+  plan->setProperty(update_proc, "test_attr_2", 
"test_val_${literal(1):plus(1)}", true);
+
+  testController.runSession(plan, false); // generate
+  testController.runSession(plan, false); // update
+  testController.runSession(plan, false); // log
+
+  REQUIRE(LogTestController::getInstance().contains("key:test_attr_1 
value:test_val_1"));
+  REQUIRE(LogTestController::getInstance().contains("key:test_attr_2 
value:test_val_2"));
+
+  LogTestController::getInstance().reset();
+}
+

Reply via email to