[GitHub] nifi-minifi-cpp issue #306: MINIFICPP-457: Network Management Controller Ser...
Github user minifirocks commented on the issue: https://github.com/apache/nifi-minifi-cpp/pull/306 @achristianson please review ---
[GitHub] nifi-minifi-cpp issue #301: MINIFICPP-459 Include FLexLexer.h in thirdparty ...
Github user minifirocks commented on the issue: https://github.com/apache/nifi-minifi-cpp/pull/301 looks good ---
[GitHub] nifi-minifi-cpp pull request #313: MINIFICPP-403: Add version into flow attr...
Github user minifirocks commented on a diff in the pull request: https://github.com/apache/nifi-minifi-cpp/pull/313#discussion_r185257853 --- Diff: libminifi/test/resources/TestHTTPGet.yml --- @@ -19,6 +19,7 @@ Flow Controller: name: MiNiFi Flow id: 2438e3c8-015a-1000-79ca-83af40ec1990 +version: 1 --- End diff -- @phrocker that's flow config yml version, i thought it refer to the the flow version. when you update the YML file, flow version change also ---
[GitHub] nifi-minifi-cpp pull request #313: MINIFICPP-403: Add version into flow attr...
GitHub user minifirocks opened a pull request: https://github.com/apache/nifi-minifi-cpp/pull/313 MINIFICPP-403: Add version into flow attributes Thank you for submitting a contribution to Apache NiFi - MiNiFi C++. In order to streamline the review of the contribution we ask you to ensure the following steps have been taken: ### For all changes: - [ ] Is there a JIRA ticket associated with this PR? Is it referenced in the commit message? - [ ] Does your PR title start with MINIFI- where is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character. - [ ] Has your PR been rebased against the latest commit within the target branch (typically master)? - [ ] Is your initial contribution a single, squashed commit? ### For code changes: - [ ] If adding new dependencies to the code, are these dependencies licensed in a way that is compatible for inclusion under [ASF 2.0](http://www.apache.org/legal/resolved.html#category-a)? - [ ] If applicable, have you updated the LICENSE file? - [ ] If applicable, have you updated the NOTICE file? ### For documentation related changes: - [ ] Have you ensured that format looks appropriate for the output in which it is rendered? ### Note: Please ensure that once the PR is submitted, you check travis-ci for build issues and submit an update to your PR as soon as possible. You can merge this pull request into a Git repository by running: $ git pull https://github.com/minifirocks/nifi-minifi-cpp add_version_to_flow Alternatively you can review and apply these changes as the patch at: https://github.com/apache/nifi-minifi-cpp/pull/313.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #313 ---
[GitHub] nifi-minifi-cpp issue #306: MINIFICPP-457: Network Management Controller Ser...
Github user minifirocks commented on the issue: https://github.com/apache/nifi-minifi-cpp/pull/306 @achristianson add test case ---
[GitHub] nifi-minifi-cpp pull request #306: MINIFICPP-457: Network Management Control...
GitHub user minifirocks opened a pull request: https://github.com/apache/nifi-minifi-cpp/pull/306 MINIFICPP-457: Network Management Controller Service for interface bi⦠â¦nding Thank you for submitting a contribution to Apache NiFi - MiNiFi C++. In order to streamline the review of the contribution we ask you to ensure the following steps have been taken: ### For all changes: - [ ] Is there a JIRA ticket associated with this PR? Is it referenced in the commit message? - [ ] Does your PR title start with MINIFI- where is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character. - [ ] Has your PR been rebased against the latest commit within the target branch (typically master)? - [ ] Is your initial contribution a single, squashed commit? ### For code changes: - [ ] If adding new dependencies to the code, are these dependencies licensed in a way that is compatible for inclusion under [ASF 2.0](http://www.apache.org/legal/resolved.html#category-a)? - [ ] If applicable, have you updated the LICENSE file? - [ ] If applicable, have you updated the NOTICE file? ### For documentation related changes: - [ ] Have you ensured that format looks appropriate for the output in which it is rendered? ### Note: Please ensure that once the PR is submitted, you check travis-ci for build issues and submit an update to your PR as soon as possible. You can merge this pull request into a Git repository by running: $ git pull https://github.com/minifirocks/nifi-minifi-cpp network_controller_interface_bind Alternatively you can review and apply these changes as the patch at: https://github.com/apache/nifi-minifi-cpp/pull/306.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #306 commit ec65487e2ab6936c49c6822b9a3966e66e756379 Author: Bin Qiu <benqiu2016@...> Date: 2018-04-23T15:39:30Z MINIFICPP-457: Network Management Controller Service for interface binding ---
[GitHub] nifi-minifi-cpp issue #295: MINFICPP-403: Flow Meta tagging
Github user minifirocks commented on the issue: https://github.com/apache/nifi-minifi-cpp/pull/295 @phrocker so if that's the case, we can remove the below meta info when the meta info container was constructed and just provide the framework, if user want to use that, they can create their own meta info and add this to the meta info container. **we provide a framework to let them add their own meta info into the flow file for mutable meta info.** : config_(configure) { +// add version, serial number as default meta info +std::unique_ptr version = std::unique_ptr < core::MetaInfo > (new VersionMetaInfo()); +addMetaInfo(std::move(version)); +std::string serial_number; +config_->get("device.id", serial_number); +state::metrics::Device device; +if (serial_number.empty()) { + // we did not config serial number, use the mac address + serial_number = device.device_id_; +} +std::unique_ptr serial_number_meta_info = std::unique_ptr < core::MetaInfo >(new MetaInfo("device.id", serial_number)); +addMetaInfo(std::move(serial_number_meta_info)); +std::unique_ptr hostname_meta_info = std::unique_ptr < core::MetaInfo >(new MetaInfo("hostname", device.canonical_hostname_)); +addMetaInfo(std::move(hostname_meta_info)); ---
[GitHub] nifi-minifi-cpp issue #297: MINIFICPP-446 Add escape/unescape HTML3 EL funct...
Github user minifirocks commented on the issue: https://github.com/apache/nifi-minifi-cpp/pull/297 looks good ---
[GitHub] nifi-minifi-cpp issue #299: MINIFICPP-454: Fix apt-get install statement
Github user minifirocks commented on the issue: https://github.com/apache/nifi-minifi-cpp/pull/299 @phrocker looks good, please merge to 1 commit ---
[GitHub] nifi-minifi-cpp issue #295: MINFICPP-403: Flow Meta tagging
Github user minifirocks commented on the issue: https://github.com/apache/nifi-minifi-cpp/pull/295 @phrocker please let me know whether you have more comments. Thanks. ---
[GitHub] nifi-minifi-cpp issue #295: MINFICPP-403: Flow Meta tagging
Github user minifirocks commented on the issue: https://github.com/apache/nifi-minifi-cpp/pull/295 @phrocker please review change shared_ptr to unique ptr use agent.version and flow.version for meta info ---
[GitHub] nifi-minifi-cpp issue #295: MINFICPP-403: Flow Meta tagging
Github user minifirocks commented on the issue: https://github.com/apache/nifi-minifi-cpp/pull/295 @phrocker the reason that i do not want to use processor is that meta info below to flow which is generated by the any processor. we provide a framework let user to write their own meta info and add to the container. for example, they can add vendor specified meta info. as for the shared ptr, i can change to unique ptr. i pick share ptr because it make it more flexible in case it need to be used somewhere besides the container. ---
[GitHub] nifi-minifi-cpp pull request #295: MINFICPP-403: Flow Meta tagging
Github user minifirocks commented on a diff in the pull request: https://github.com/apache/nifi-minifi-cpp/pull/295#discussion_r179779744 --- Diff: libminifi/include/core/MetaInfo.h --- @@ -0,0 +1,180 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef __META_INFO_H__ +#define __META_INFO_H__ + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include "utils/StringUtils.h" +#include "core/FlowFile.h" +#include "core/state/metrics/DeviceInformation.h" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace core { + +// MetaInfo Class +class MetaInfo { + + public: + // Constructor + /*! + * Create a new meta info + */ + explicit MetaInfo(const std::string , const std::string ) + : name_(name), +value_(value) { + } + + // Destructor + virtual ~MetaInfo() { + } + + // Get Name for the meta info + std::string getName() { +return name_; + } + // Get value for the meta info, overwritten by specific child meta info + virtual std::string getValue(const std::shared_ptr ) { +return value_; + } + + protected: + // Name + std::string name_; + // Value + std::string value_; + + private: +}; + +#ifdef MINIFI_VERSION --- End diff -- will do ---
[GitHub] nifi-minifi-cpp issue #295: MINFICPP-403: Flow Meta tagging
Github user minifirocks commented on the issue: https://github.com/apache/nifi-minifi-cpp/pull/295 @phrocker Thanks for the review. the PR is not only for version, it provide a flexible framework to add other meta info also. also in the cmake file, we already specify major/minor/patch version, why we need another generateVersion.sh. ---
[GitHub] nifi-minifi-cpp pull request #295: MINFICPP-403: Flow Meta tagging
GitHub user minifirocks opened a pull request: https://github.com/apache/nifi-minifi-cpp/pull/295 MINFICPP-403: Flow Meta tagging Thank you for submitting a contribution to Apache NiFi - MiNiFi C++. In order to streamline the review of the contribution we ask you to ensure the following steps have been taken: ### For all changes: - [ ] Is there a JIRA ticket associated with this PR? Is it referenced in the commit message? - [ ] Does your PR title start with MINIFI- where is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character. - [ ] Has your PR been rebased against the latest commit within the target branch (typically master)? - [ ] Is your initial contribution a single, squashed commit? ### For code changes: - [ ] If adding new dependencies to the code, are these dependencies licensed in a way that is compatible for inclusion under [ASF 2.0](http://www.apache.org/legal/resolved.html#category-a)? - [ ] If applicable, have you updated the LICENSE file? - [ ] If applicable, have you updated the NOTICE file? ### For documentation related changes: - [ ] Have you ensured that format looks appropriate for the output in which it is rendered? ### Note: Please ensure that once the PR is submitted, you check travis-ci for build issues and submit an update to your PR as soon as possible. You can merge this pull request into a Git repository by running: $ git pull https://github.com/minifirocks/nifi-minifi-cpp meta_info Alternatively you can review and apply these changes as the patch at: https://github.com/apache/nifi-minifi-cpp/pull/295.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #295 commit 22f52fb9d225233ba63df04eb8c91ca9af94a4ba Author: Bin Qiu <benqiu2016@...> Date: 2018-04-03T15:57:23Z MINFICPP-403: Flow Meta tagging ---
[GitHub] nifi-minifi-cpp pull request #280: MINIFICPP-404: http proxy support for s2s
GitHub user minifirocks opened a pull request: https://github.com/apache/nifi-minifi-cpp/pull/280 MINIFICPP-404: http proxy support for s2s Thank you for submitting a contribution to Apache NiFi - MiNiFi C++. In order to streamline the review of the contribution we ask you to ensure the following steps have been taken: ### For all changes: - [ ] Is there a JIRA ticket associated with this PR? Is it referenced in the commit message? - [ ] Does your PR title start with MINIFI- where is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character. - [ ] Has your PR been rebased against the latest commit within the target branch (typically master)? - [ ] Is your initial contribution a single, squashed commit? ### For code changes: - [ ] If adding new dependencies to the code, are these dependencies licensed in a way that is compatible for inclusion under [ASF 2.0](http://www.apache.org/legal/resolved.html#category-a)? - [ ] If applicable, have you updated the LICENSE file? - [ ] If applicable, have you updated the NOTICE file? ### For documentation related changes: - [ ] Have you ensured that format looks appropriate for the output in which it is rendered? ### Note: Please ensure that once the PR is submitted, you check travis-ci for build issues and submit an update to your PR as soon as possible. You can merge this pull request into a Git repository by running: $ git pull https://github.com/minifirocks/nifi-minifi-cpp http_proxy Alternatively you can review and apply these changes as the patch at: https://github.com/apache/nifi-minifi-cpp/pull/280.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #280 commit e3ae4cdae5e1b474a4bab0b8cb5fe530f6a7f741 Author: Bin Qiu <benqiu2016@...> Date: 2018-03-08T16:05:35Z MINIFICPP-404: http proxy support for s2s ---
[GitHub] nifi-minifi-cpp pull request #268: MINIFICPP-397 Added implementation of Rou...
Github user minifirocks commented on a diff in the pull request: https://github.com/apache/nifi-minifi-cpp/pull/268#discussion_r171389409 --- Diff: libminifi/src/processors/RouteOnAttribute.cpp --- @@ -0,0 +1,107 @@ +/** + * @file RouteOnAttribute.cpp + * RouteOnAttribute class implementation + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "processors/RouteOnAttribute.h" + +#include +#include +#include + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace processors { + +core::Relationship RouteOnAttribute::Unmatched( +"unmatched", +"Files which do not match any expression are routed here"); +core::Relationship RouteOnAttribute::Failure( +"failure", +"Failed files are transferred to failure"); + +void RouteOnAttribute::initialize() { + std::set properties; + setSupportedProperties(properties); +} + +void RouteOnAttribute::onDynamicPropertyModified(const core::Property _property, + const core::Property _property) { + + // Update the routing table when routes are added via dynamic properties. + route_properties_[new_property.getName()] = new_property; + + std::set relationships; + + for (const auto : route_properties_) { +core::Relationship route_rel{route.first, "Dynamic route"}; +route_rels_[route.first] = route_rel; +relationships.insert(route_rel); +logger_->log_info("RouteOnAttribute registered route '%s' with expression '%s'", + route.first, + route.second.getValue()); + } + + relationships.insert(Unmatched); + relationships.insert(Failure); + setSupportedRelationships(relationships); --- End diff -- OK. ---
[GitHub] nifi-minifi-cpp pull request #268: MINIFICPP-397 Added implementation of Rou...
Github user minifirocks commented on a diff in the pull request: https://github.com/apache/nifi-minifi-cpp/pull/268#discussion_r171293683 --- Diff: libminifi/src/processors/RouteOnAttribute.cpp --- @@ -0,0 +1,107 @@ +/** + * @file RouteOnAttribute.cpp + * RouteOnAttribute class implementation + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "processors/RouteOnAttribute.h" + +#include +#include +#include + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace processors { + +core::Relationship RouteOnAttribute::Unmatched( +"unmatched", +"Files which do not match any expression are routed here"); +core::Relationship RouteOnAttribute::Failure( +"failure", +"Failed files are transferred to failure"); + +void RouteOnAttribute::initialize() { + std::set properties; + setSupportedProperties(properties); +} + +void RouteOnAttribute::onDynamicPropertyModified(const core::Property _property, + const core::Property _property) { + + // Update the routing table when routes are added via dynamic properties. + route_properties_[new_property.getName()] = new_property; + + std::set relationships; + + for (const auto : route_properties_) { +core::Relationship route_rel{route.first, "Dynamic route"}; +route_rels_[route.first] = route_rel; +relationships.insert(route_rel); +logger_->log_info("RouteOnAttribute registered route '%s' with expression '%s'", + route.first, + route.second.getValue()); + } + + relationships.insert(Unmatched); + relationships.insert(Failure); + setSupportedRelationships(relationships); --- End diff -- bool Connectable::setSupportedRelationships(std::set relationships) { if (isRunning()) { logger_->log_warn("Can not set processor supported relationship while the process %s is running", name_); return false; } what if we do the onDynamicPropertyModified while the processor is running ---
[GitHub] nifi-minifi-cpp pull request #272: MINIFICPP-405: RPG bind to local interfac...
GitHub user minifirocks opened a pull request: https://github.com/apache/nifi-minifi-cpp/pull/272 MINIFICPP-405: RPG bind to local interface Thank you for submitting a contribution to Apache NiFi - MiNiFi C++. In order to streamline the review of the contribution we ask you to ensure the following steps have been taken: ### For all changes: - [ ] Is there a JIRA ticket associated with this PR? Is it referenced in the commit message? - [ ] Does your PR title start with MINIFI- where is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character. - [ ] Has your PR been rebased against the latest commit within the target branch (typically master)? - [ ] Is your initial contribution a single, squashed commit? ### For code changes: - [ ] If adding new dependencies to the code, are these dependencies licensed in a way that is compatible for inclusion under [ASF 2.0](http://www.apache.org/legal/resolved.html#category-a)? - [ ] If applicable, have you updated the LICENSE file? - [ ] If applicable, have you updated the NOTICE file? ### For documentation related changes: - [ ] Have you ensured that format looks appropriate for the output in which it is rendered? ### Note: Please ensure that once the PR is submitted, you check travis-ci for build issues and submit an update to your PR as soon as possible. You can merge this pull request into a Git repository by running: $ git pull https://github.com/minifirocks/nifi-minifi-cpp local_network_interface Alternatively you can review and apply these changes as the patch at: https://github.com/apache/nifi-minifi-cpp/pull/272.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #272 commit 1313bedffd957444dbfd807f26a3d7ac110fdc89 Author: Bin Qiu <benqiu2016@...> Date: 2018-02-26T15:21:24Z MINIFICPP-405: RPG bind to local interface ---
[GitHub] nifi-minifi-cpp issue #260: MINIFICPP-382: Implement SUSE release support fo...
Github user minifirocks commented on the issue: https://github.com/apache/nifi-minifi-cpp/pull/260 Looks good. ---
[GitHub] nifi-minifi-cpp pull request #259: MINIFICPP-393: Add security support for M...
Github user minifirocks commented on a diff in the pull request: https://github.com/apache/nifi-minifi-cpp/pull/259#discussion_r16631 --- Diff: extensions/mqtt/ConsumeMQTT.cpp --- @@ -35,7 +35,7 @@ namespace nifi { namespace minifi { namespace processors { -core::Property ConsumeMQTT::MaxQueueSize("Max Flow Segment Size", "Maximum flow content payload segment size for the MQTT record", ""); +core::Property ConsumeMQTT::MaxQueueSize("Max Queue Size", "Maximum receive queue size for the MQTT record", ""); --- End diff -- Fixed in latest commit ---
[GitHub] nifi-minifi-cpp pull request #259: MINIFICPP-393: Add security support for M...
Github user minifirocks commented on a diff in the pull request: https://github.com/apache/nifi-minifi-cpp/pull/259#discussion_r166130342 --- Diff: extensions/mqtt/ConsumeMQTT.cpp --- @@ -35,7 +35,7 @@ namespace nifi { namespace minifi { namespace processors { -core::Property ConsumeMQTT::MaxQueueSize("Max Flow Segment Size", "Maximum flow content payload segment size for the MQTT record", ""); +core::Property ConsumeMQTT::MaxQueueSize("Max Queue Size", "Maximum receive queue size for the MQTT record", ""); --- End diff -- The previous version of the parameter configuration is wrong so that is no backward compatible. ---
[GitHub] nifi-minifi-cpp pull request #259: MINIFICPP-393: Add security support for M...
Github user minifirocks commented on a diff in the pull request: https://github.com/apache/nifi-minifi-cpp/pull/259#discussion_r166072113 --- Diff: extensions/mqtt/AbstractMQTTProcessor.cpp --- @@ -119,6 +126,38 @@ void AbstractMQTTProcessor::onSchedule(core::ProcessContext *context, core::Proc qos_ = valInt; logger_->log_debug("AbstractMQTTProcessor: QOS [%ll]", qos_); } + value = ""; + + if (context->getProperty(SecurityProtocol.getName(), value) && !value.empty()) { +if (value == MQTT_SECURITY_PROTOCOL_SSL) { + sslEnabled_ = true; --- End diff -- the SSL handshake will fail and based on the error, user need to config the right certs. ---
[GitHub] nifi-minifi-cpp pull request #259: MINIFICPP-393: Add security support for M...
Github user minifirocks commented on a diff in the pull request: https://github.com/apache/nifi-minifi-cpp/pull/259#discussion_r166071814 --- Diff: extensions/mqtt/ConsumeMQTT.cpp --- @@ -35,7 +35,7 @@ namespace nifi { namespace minifi { namespace processors { -core::Property ConsumeMQTT::MaxQueueSize("Max Flow Segment Size", "Maximum flow content payload segment size for the MQTT record", ""); +core::Property ConsumeMQTT::MaxQueueSize("Max Queue Size", "Maximum receive queue size for the MQTT record", ""); --- End diff -- we need to correct that because the variable is not right in the first place. also if user was using that option, it will default to the default value. ---
[GitHub] nifi-minifi-cpp pull request #259: MINIFICPP-393: Add security support for M...
GitHub user minifirocks opened a pull request: https://github.com/apache/nifi-minifi-cpp/pull/259 MINIFICPP-393: Add security support for MQTT Thank you for submitting a contribution to Apache NiFi - MiNiFi C++. In order to streamline the review of the contribution we ask you to ensure the following steps have been taken: ### For all changes: - [ ] Is there a JIRA ticket associated with this PR? Is it referenced in the commit message? - [ ] Does your PR title start with MINIFI- where is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character. - [ ] Has your PR been rebased against the latest commit within the target branch (typically master)? - [ ] Is your initial contribution a single, squashed commit? ### For code changes: - [ ] If adding new dependencies to the code, are these dependencies licensed in a way that is compatible for inclusion under [ASF 2.0](http://www.apache.org/legal/resolved.html#category-a)? - [ ] If applicable, have you updated the LICENSE file? - [ ] If applicable, have you updated the NOTICE file? ### For documentation related changes: - [ ] Have you ensured that format looks appropriate for the output in which it is rendered? ### Note: Please ensure that once the PR is submitted, you check travis-ci for build issues and submit an update to your PR as soon as possible. You can merge this pull request into a Git repository by running: $ git pull https://github.com/minifirocks/nifi-minifi-cpp mqtt_security Alternatively you can review and apply these changes as the patch at: https://github.com/apache/nifi-minifi-cpp/pull/259.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #259 commit c5d46bdb9dde46c9c3b8e2bb2f8936c5b59c861d Author: Bin Qiu <benqiu2016@...> Date: 2018-02-05T15:32:14Z MINIFICPP-393: Add security support for MQTT ---
[GitHub] nifi-minifi-cpp pull request #242: MINIFICPP-374: Commit Linux power managem...
Github user minifirocks commented on a diff in the pull request: https://github.com/apache/nifi-minifi-cpp/pull/242#discussion_r162174433 --- Diff: README.md --- @@ -609,6 +609,21 @@ Additionally, a unique hexadecimal uid.minifi.device.segment should be assigned class: ControllerServiceClass Properties: +### Linux Power Manager Controller Service + The linux power manager controller service can be configured to monitor the battery level and status ( discharging or charging ) via the following configuration. + Simply provide the capacity path and status path along with your threshold for the trigger and low battery alarm and you can monitor your battery and throttle + the threadpools within MiNiFi C++. Note that the name is identified must be ThreadPoolManager. + + Controller Services: +- name: ThreadPoolManager + id: 2438e3c8-015a-1000-79ca-83af40ec1888 + class: LinuxPowerManagerService + Properties: + Battery Capacity Path: /path/to/battery/capacity + Battery Status Path: /path/to/battery/status --- End diff -- overall looks good. One question I have is why we implement using a controller service instead of a processor. It looks like more like processor to monitor the standard pro filesystem and perform some user config actions ---
[GitHub] nifi-minifi-cpp issue #236: MINIFICPP-337: make default log directory as log...
Github user minifirocks commented on the issue: https://github.com/apache/nifi-minifi-cpp/pull/236 @apiri using MINIFI_HOME/logs. please review ---
[GitHub] nifi-minifi-cpp pull request #236: MINIFICPP-337: make default log directory...
Github user minifirocks commented on a diff in the pull request: https://github.com/apache/nifi-minifi-cpp/pull/236#discussion_r161296430 --- Diff: libminifi/src/core/logging/LoggerConfiguration.cpp --- @@ -110,6 +112,17 @@ std::shared_ptr LoggerConfiguration::initialize_names if (!logger_properties->get(appender_key + ".file_name", file_name)) { file_name = "minifi-app.log"; } + std::string directory = ""; + if (logger_properties->get(appender_key + ".directory", directory)) { +// Create the log directory if needed +struct stat logDirStat; +if (stat(directory.c_str(), ) != 0 || !S_ISDIR(logDirStat.st_mode)) { + if (mkdir(directory.c_str(), 0777) == -1) { --- End diff -- so if you start bin/minifi.sh, the log directory will be ./logs where you start the same ---
[GitHub] nifi-minifi-cpp pull request #236: MINIFICPP-337: make default log directory...
Github user minifirocks commented on a diff in the pull request: https://github.com/apache/nifi-minifi-cpp/pull/236#discussion_r161295854 --- Diff: libminifi/src/core/logging/LoggerConfiguration.cpp --- @@ -110,6 +112,17 @@ std::shared_ptr LoggerConfiguration::initialize_names if (!logger_properties->get(appender_key + ".file_name", file_name)) { file_name = "minifi-app.log"; } + std::string directory = ""; + if (logger_properties->get(appender_key + ".directory", directory)) { +// Create the log directory if needed +struct stat logDirStat; +if (stat(directory.c_str(), ) != 0 || !S_ISDIR(logDirStat.st_mode)) { + if (mkdir(directory.c_str(), 0777) == -1) { --- End diff -- we read the directory variable from minifi-log.properties which specify the log name and log directory. the directory can be a absolute path or relative path. ---
[GitHub] nifi-minifi-cpp pull request #236: MINIFICPP-337: make default log directory...
GitHub user minifirocks opened a pull request: https://github.com/apache/nifi-minifi-cpp/pull/236 MINIFICPP-337: make default log directory as logs Thank you for submitting a contribution to Apache NiFi - MiNiFi C++. In order to streamline the review of the contribution we ask you to ensure the following steps have been taken: ### For all changes: - [ ] Is there a JIRA ticket associated with this PR? Is it referenced in the commit message? - [ ] Does your PR title start with MINIFI- where is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character. - [ ] Has your PR been rebased against the latest commit within the target branch (typically master)? - [ ] Is your initial contribution a single, squashed commit? ### For code changes: - [ ] If adding new dependencies to the code, are these dependencies licensed in a way that is compatible for inclusion under [ASF 2.0](http://www.apache.org/legal/resolved.html#category-a)? - [ ] If applicable, have you updated the LICENSE file? - [ ] If applicable, have you updated the NOTICE file? ### For documentation related changes: - [ ] Have you ensured that format looks appropriate for the output in which it is rendered? ### Note: Please ensure that once the PR is submitted, you check travis-ci for build issues and submit an update to your PR as soon as possible. You can merge this pull request into a Git repository by running: $ git pull https://github.com/minifirocks/nifi-minifi-cpp MINIFICPP-337 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/nifi-minifi-cpp/pull/236.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #236 commit de8a48c8b15bdec4d5083af2088e5b308faf5ef7 Author: Bin Qiu <benqiu2016@...> Date: 2018-01-10T16:49:58Z MINIFICPP-337: make default log directory as logs ---
[GitHub] nifi-minifi-cpp issue #228: MINIFICPP-342: MQTT extension
Github user minifirocks commented on the issue: https://github.com/apache/nifi-minifi-cpp/pull/228 @phrocker addressed comments, please review. ---
[GitHub] nifi-minifi-cpp pull request #228: MINIFICPP-342: MQTT extension
Github user minifirocks commented on a diff in the pull request: https://github.com/apache/nifi-minifi-cpp/pull/228#discussion_r159807078 --- Diff: extensions/mqtt/PublishMQTT.h --- @@ -0,0 +1,142 @@ +/** + * @file PublishMQTT.h + * PublishMQTT class declaration + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef __PUBLISH_MQTT_H__ +#define __PUBLISH_MQTT_H__ + +#include "FlowFileRecord.h" +#include "core/Processor.h" +#include "core/ProcessSession.h" +#include "core/Core.h" +#include "core/Resource.h" +#include "core/Property.h" +#include "core/logging/LoggerConfiguration.h" +#include "MQTTClient.h" +#include "AbstractMQTTProcessor.h" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace processors { + +// PublishMQTT Class +class PublishMQTT: public processors::AbstractMQTTProcessor { +public: + // Constructor + /*! + * Create a new processor + */ + explicit PublishMQTT(std::string name, uuid_t uuid = NULL) +: processors::AbstractMQTTProcessor(name, uuid), logger_(logging::LoggerFactory::getLogger()) { +retain_ = false; +max_seg_size_ = ULLONG_MAX; + } + // Destructor + virtual ~PublishMQTT() { + } + // Processor Name + static constexpr char const* ProcessorName = "PublishMQTT"; + // Supported Properties + static core::Property Retain; + static core::Property MaxFlowSegSize; + + // Nest Callback Class for read stream + class ReadCallback: public InputStreamCallback { + public: +ReadCallback(uint64_t flow_size, uint64_t max_seg_size, const std::string , MQTTClient client, +int qos, bool retain, MQTTClient_deliveryToken ) : +flow_size_(flow_size), max_seg_size_(max_seg_size), key_(key), client_(client), +qos_(qos), retain_(retain), token_(token) { + status_ = 0; + read_size_ = 0; +} +~ReadCallback() { +} +int64_t process(std::shared_ptr stream) { + if (flow_size_ < max_seg_size_) +max_seg_size_ = flow_size_; + std::vector buffer; + buffer.reserve(max_seg_size_); + read_size_ = 0; + status_ = 0; + while (read_size_ < flow_size_) { +int readRet = stream->read([0], max_seg_size_); +if (readRet < 0) { + status_ = -1; + return read_size_; +} +if (readRet > 0) { + MQTTClient_message pubmsg = MQTTClient_message_initializer; + pubmsg.payload = [0]; + pubmsg.payloadlen = readRet; + pubmsg.qos = qos_; + pubmsg.retained = retain_; + if (MQTTClient_publishMessage(client_, key_.c_str(), , _) != MQTTCLIENT_SUCCESS) { --- End diff -- it add the MQTT header and call socket write. the deliverable callback is for QOS. /** * This is a callback function. The client application * must provide an implementation of this function to enable asynchronous * notification of delivery of messages. The function is registered with the * client library by passing it as an argument to MQTTClient_setCallbacks(). * It is called by the client library after the client application has * published a message to the server. It indicates that the necessary * handshaking and acknowledgements for the requested quality of service (see * MQTTClient_message.qos) have been completed. This function is executed on a * separate thread to the one on which the client application is running. * Note:MQTTClient_deliveryComplete() is not called when messages are * published at QoS0. * @param context A pointer to the context value originally passed to * MQTTClient_setCallbacks(), which contains any application-specific context. * @p
[GitHub] nifi-minifi-cpp pull request #228: MINIFICPP-342: MQTT extension
Github user minifirocks commented on a diff in the pull request: https://github.com/apache/nifi-minifi-cpp/pull/228#discussion_r159807097 --- Diff: thirdparty/paho.mqtt.c/CMakeLists.txt --- @@ -0,0 +1,86 @@ +#*** +# Copyright (c) 2015, 2017 logi.cals GmbH and others +# +# All rights reserved. This program and the accompanying materials +# are made available under the terms of the Eclipse Public License v1.0 +# and Eclipse Distribution License v1.0 which accompany this distribution. +# +# The Eclipse Public License is available at +# http://www.eclipse.org/legal/epl-v10.html +# and the Eclipse Distribution License is available at +#http://www.eclipse.org/org/documents/edl-v10.php. +# +# Contributors: +# Rainer Poisel - initial version +# Genis Riera Perez - Add support for building debian package +#***/ + +# Note: on OS X you should install XCode and the associated command-line tools + +CMAKE_MINIMUM_REQUIRED(VERSION 2.8.4) +PROJECT("paho" C) +MESSAGE(STATUS "CMake version: " ${CMAKE_VERSION}) +MESSAGE(STATUS "CMake system name: " ${CMAKE_SYSTEM_NAME}) + +SET(CMAKE_SCRIPTS "${CMAKE_SOURCE_DIR}/cmake") +SET(CMAKE_MODULE_PATH "${CMAKE_SOURCE_DIR}/cmake/modules") + +## build settings +SET(PAHO_VERSION_MAJOR 1) +SET(PAHO_VERSION_MINOR 2) +SET(PAHO_VERSION_PATCH 0) +SET(CLIENT_VERSION ${PAHO_VERSION_MAJOR}.${PAHO_VERSION_MINOR}.${PAHO_VERSION_PATCH}) + +INCLUDE(GNUInstallDirs) + +STRING(TIMESTAMP BUILD_TIMESTAMP UTC) +MESSAGE(STATUS "Timestamp is ${BUILD_TIMESTAMP}") + +IF(WIN32) + ADD_DEFINITIONS(-D_CRT_SECURE_NO_DEPRECATE -DWIN32_LEAN_AND_MEAN -MD) +ELSEIF(${CMAKE_SYSTEM_NAME} STREQUAL "Darwin") + ADD_DEFINITIONS(-DOSX) +ENDIF() + +## build options +SET(PAHO_WITH_SSL TRUE CACHE BOOL "Flag that defines whether to build ssl-enabled binaries too. ") +SET(PAHO_BUILD_STATIC FALSE CACHE BOOL "Build static library") +SET(PAHO_BUILD_DOCUMENTATION FALSE CACHE BOOL "Create and install the HTML based API documentation (requires Doxygen)") +SET(PAHO_BUILD_SAMPLES FALSE CACHE BOOL "Build sample programs") +SET(PAHO_BUILD_DEB_PACKAGE FALSE CACHE BOOL "Build debian package") +SET(PAHO_ENABLE_TESTING FALSE CACHE BOOL "Build tests and run") + +ADD_SUBDIRECTORY(src) +IF(PAHO_BUILD_SAMPLES) +ADD_SUBDIRECTORY(src/samples) +ENDIF() + +IF(PAHO_BUILD_DOCUMENTATION) +ADD_SUBDIRECTORY(doc) +ENDIF() + +### packaging settings +IF (WIN32) +SET(CPACK_GENERATOR "ZIP") +ELSEIF(PAHO_BUILD_DEB_PACKAGE) +SET(CPACK_GENERATOR "DEB") +CONFIGURE_FILE(${CMAKE_SCRIPTS}/CPackDebConfig.cmake.in +${CMAKE_BINARY_DIR}/CPackDebConfig.cmake @ONLY) +SET(CPACK_PROJECT_CONFIG_FILE ${CMAKE_BINARY_DIR}/CPackDebConfig.cmake) +ADD_SUBDIRECTORY(debian) +ELSE() +SET(CPACK_GENERATOR "TGZ") +ENDIF() + +SET(CPACK_PACKAGE_VERSION_MAJOR ${PAHO_VERSION_MAJOR}) +SET(CPACK_PACKAGE_VERSION_MINOR ${PAHO_VERSION_MINOR}) +SET(CPACK_PACKAGE_VERSION_PATCH ${PAHO_VERSION_PATCH}) +INCLUDE(CPack) + +IF(PAHO_ENABLE_TESTING) --- End diff -- will remove ---
[GitHub] nifi-minifi-cpp pull request #228: MINIFICPP-342: MQTT extension
Github user minifirocks commented on a diff in the pull request: https://github.com/apache/nifi-minifi-cpp/pull/228#discussion_r159807086 --- Diff: thirdparty/paho.mqtt.c/.travis.yml --- @@ -0,0 +1,47 @@ +sudo: true --- End diff -- will remove ---
[GitHub] nifi-minifi-cpp pull request #228: MINIFICPP-342: MQTT extension
Github user minifirocks commented on a diff in the pull request: https://github.com/apache/nifi-minifi-cpp/pull/228#discussion_r159806405 --- Diff: extensions/mqtt/ConsumeMQTT.h --- @@ -0,0 +1,125 @@ +/** + * @file ConsumeMQTT.h + * ConsumeMQTT class declaration + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef __CONSUME_MQTT_H__ +#define __CONSUME_MQTT_H__ + +#include +#include +#include "FlowFileRecord.h" +#include "core/Processor.h" +#include "core/ProcessSession.h" +#include "core/Core.h" +#include "core/Resource.h" +#include "core/Property.h" +#include "core/logging/LoggerConfiguration.h" +#include "MQTTClient.h" +#include "AbstractMQTTProcessor.h" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace processors { + +#define MQTT_TOPIC_ATTRIBUTE "mqtt.topic" +#define MQTT_BROKER_ATTRIBUTE "mqtt.broker" + +// ConsumeMQTT Class +class ConsumeMQTT: public processors::AbstractMQTTProcessor { +public: + // Constructor + /*! + * Create a new processor + */ + explicit ConsumeMQTT(std::string name, uuid_t uuid = NULL) +: processors::AbstractMQTTProcessor(name, uuid), logger_(logging::LoggerFactory::getLogger()) { +isSubscriber_ = true; +maxQueueSize_ = 100; + } + // Destructor + virtual ~ConsumeMQTT() { +std::lock_guard < std::mutex > lock(mutex_); +while (!queue_.empty()) { + MQTTClient_message *message = queue_.front(); + MQTTClient_freeMessage(); + queue_.pop_front(); +} + } + // Processor Name + static constexpr char const* ProcessorName = "ConsumeMQTT"; + // Supported Properties + static core::Property MaxQueueSize; + // Nest Callback Class for write stream + class WriteCallback: public OutputStreamCallback { + public: +WriteCallback(MQTTClient_message *message) : + message_(message) { + status_ = 0; +} +MQTTClient_message *message_; +int64_t process(std::shared_ptr stream) { + int64_t len = stream->write(reinterpret_cast<uint8_t*>(message_->payload), message_->payloadlen); + if (len < 0) +status_ = -1; + return len; +} +int status_; + }; + +public: + /** + * Function that's executed when the processor is scheduled. + * @param context process context. + * @param sessionFactory process session factory that is used when creating + * ProcessSession objects. + */ + void onSchedule(core::ProcessContext *context, core::ProcessSessionFactory *sessionFactory); + // OnTrigger method, implemented by NiFi ConsumeMQTT + virtual void onTrigger(const std::shared_ptr , const std::shared_ptr ); + // Initialize, over write by NiFi ConsumeMQTT + virtual void initialize(void); + virtual bool enqueueReceiveMQTTMsg(MQTTClient_message *message); + +protected: + void getReceivedMQTTMsg(std::deque _queue) { +std::lock_guard < std::mutex > lock(mutex_); --- End diff -- will do ---
[GitHub] nifi-minifi-cpp pull request #228: MINIFICPP-342: MQTT extension
Github user minifirocks commented on a diff in the pull request: https://github.com/apache/nifi-minifi-cpp/pull/228#discussion_r159806371 --- Diff: extensions/mqtt/AbstractMQTTProcessor.h --- @@ -0,0 +1,154 @@ +/** + * @file AbstractMQTTProcessor.h + * AbstractMQTTProcessor class declaration + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef __ABSTRACTMQTT_H__ +#define __ABSTRACTMQTT_H__ + +#include "FlowFileRecord.h" +#include "core/Processor.h" +#include "core/ProcessSession.h" +#include "core/Core.h" +#include "core/Resource.h" +#include "core/logging/LoggerConfiguration.h" +#include "MQTTClient.h" + +#define MQTT_QOS_0 "0" +#define MQTT_QOS_1 "1" +#define MQTT_QOS_2 "2" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace processors { + +// AbstractMQTTProcessor Class +class AbstractMQTTProcessor : public core::Processor { + public: + // Constructor + /*! + * Create a new processor + */ + explicit AbstractMQTTProcessor(std::string name, uuid_t uuid = NULL) + : core::Processor(name, uuid), + logger_(logging::LoggerFactory::getLogger()) { +client_ = nullptr; +cleanSession_ = false; +keepAliveInterval_ = 60; +connectionTimeOut_ = 30; +qos_ = 0; +isSubscriber_ = false; + } + // Destructor + virtual ~AbstractMQTTProcessor() { +if (isSubscriber_) { + MQTTClient_unsubscribe(client_, topic_.c_str()); --- End diff -- it should be. only side effect is if app halt and did not unsub. The broker will buffer the msgs to the topic while the client is go away. if the app restart with the same topic and same client ID, it will rx all msg buffer for that client buffered in the broker. ---
[GitHub] nifi-minifi-cpp pull request #228: MINIFICPP-342: MQTT extension
Github user minifirocks commented on a diff in the pull request: https://github.com/apache/nifi-minifi-cpp/pull/228#discussion_r159805956 --- Diff: extensions/mqtt/AbstractMQTTProcessor.cpp --- @@ -0,0 +1,158 @@ +/** + * @file AbstractMQTTProcessor.cpp + * AbstractMQTTProcessor class implementation + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "AbstractMQTTProcessor.h" +#include +#include +#include +#include "utils/TimeUtil.h" +#include "utils/StringUtils.h" +#include "core/ProcessContext.h" +#include "core/ProcessSession.h" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace processors { + +core::Property AbstractMQTTProcessor::BrokerURL("Broker URI", "The URI to use to connect to the MQTT broker", ""); +core::Property AbstractMQTTProcessor::CleanSession("Session state", "Whether to start afresh or resume previous flows. See the allowable value descriptions for more details", "true"); +core::Property AbstractMQTTProcessor::ClientID("Client ID", "MQTT client ID to use", ""); +core::Property AbstractMQTTProcessor::UserName("Username", "Username to use when connecting to the broker", ""); +core::Property AbstractMQTTProcessor::PassWord("Password", "Password to use when connecting to the broker", ""); +core::Property AbstractMQTTProcessor::KeepLiveInterval("Keep Alive Interval", "Defines the maximum time interval between messages sent or received", "60 sec"); +core::Property AbstractMQTTProcessor::ConnectionTimeOut("Connection Timeout", "Maximum time interval the client will wait for the network connection to the MQTT server", "30 sec"); +core::Property AbstractMQTTProcessor::QOS("Quality of Service", "The Quality of Service(QoS) to send the message with. Accepts three values '0', '1' and '2'", "MQTT_QOS_0"); +core::Property AbstractMQTTProcessor::Topic("Topic", "The topic to publish the message to", ""); +core::Relationship AbstractMQTTProcessor::Success("success", "FlowFiles that are sent successfully to the destination are transferred to this relationship"); +core::Relationship AbstractMQTTProcessor::Failure("failure", "FlowFiles that failed to send to the destination are transferred to this relationship"); + +void AbstractMQTTProcessor::initialize() { + // Set the supported properties + std::set properties; + properties.insert(BrokerURL); + properties.insert(CleanSession); + properties.insert(ClientID); + properties.insert(UserName); + properties.insert(PassWord); + properties.insert(KeepLiveInterval); + properties.insert(ConnectionTimeOut); + properties.insert(QOS); + properties.insert(Topic); + setSupportedProperties(properties); + // Set the supported relationships + std::set relationships; + relationships.insert(Success); + relationships.insert(Failure); + setSupportedRelationships(relationships); +} + +void AbstractMQTTProcessor::onSchedule(core::ProcessContext *context, core::ProcessSessionFactory *sessionFactory) { + std::string value; + int64_t valInt; + value = ""; + if (context->getProperty(BrokerURL.getName(), value) && !value.empty()) { +uri_ = value; +logger_->log_info("AbstractMQTTProcessor: BrokerURL [%s]", uri_); + } + value = ""; + if (context->getProperty(ClientID.getName(), value) && !value.empty()) { +clientID_ = value; +logger_->log_info("AbstractMQTTProcessor: ClientID [%s]", clientID_); + } + value = ""; + if (context->getProperty
[GitHub] nifi-minifi-cpp pull request #228: MINIFICPP-342: MQTT extension
Github user minifirocks commented on a diff in the pull request: https://github.com/apache/nifi-minifi-cpp/pull/228#discussion_r159804409 --- Diff: extensions/mqtt/AbstractMQTTProcessor.cpp --- @@ -0,0 +1,158 @@ +/** + * @file AbstractMQTTProcessor.cpp + * AbstractMQTTProcessor class implementation + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "AbstractMQTTProcessor.h" +#include +#include +#include +#include "utils/TimeUtil.h" +#include "utils/StringUtils.h" +#include "core/ProcessContext.h" +#include "core/ProcessSession.h" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace processors { + +core::Property AbstractMQTTProcessor::BrokerURL("Broker URI", "The URI to use to connect to the MQTT broker", ""); +core::Property AbstractMQTTProcessor::CleanSession("Session state", "Whether to start afresh or resume previous flows. See the allowable value descriptions for more details", "true"); +core::Property AbstractMQTTProcessor::ClientID("Client ID", "MQTT client ID to use", ""); +core::Property AbstractMQTTProcessor::UserName("Username", "Username to use when connecting to the broker", ""); +core::Property AbstractMQTTProcessor::PassWord("Password", "Password to use when connecting to the broker", ""); +core::Property AbstractMQTTProcessor::KeepLiveInterval("Keep Alive Interval", "Defines the maximum time interval between messages sent or received", "60 sec"); +core::Property AbstractMQTTProcessor::ConnectionTimeOut("Connection Timeout", "Maximum time interval the client will wait for the network connection to the MQTT server", "30 sec"); +core::Property AbstractMQTTProcessor::QOS("Quality of Service", "The Quality of Service(QoS) to send the message with. Accepts three values '0', '1' and '2'", "MQTT_QOS_0"); +core::Property AbstractMQTTProcessor::Topic("Topic", "The topic to publish the message to", ""); +core::Relationship AbstractMQTTProcessor::Success("success", "FlowFiles that are sent successfully to the destination are transferred to this relationship"); +core::Relationship AbstractMQTTProcessor::Failure("failure", "FlowFiles that failed to send to the destination are transferred to this relationship"); + +void AbstractMQTTProcessor::initialize() { + // Set the supported properties + std::set properties; + properties.insert(BrokerURL); + properties.insert(CleanSession); + properties.insert(ClientID); + properties.insert(UserName); + properties.insert(PassWord); + properties.insert(KeepLiveInterval); + properties.insert(ConnectionTimeOut); + properties.insert(QOS); + properties.insert(Topic); + setSupportedProperties(properties); + // Set the supported relationships + std::set relationships; + relationships.insert(Success); + relationships.insert(Failure); + setSupportedRelationships(relationships); +} + +void AbstractMQTTProcessor::onSchedule(core::ProcessContext *context, core::ProcessSessionFactory *sessionFactory) { + std::string value; + int64_t valInt; + value = ""; + if (context->getProperty(BrokerURL.getName(), value) && !value.empty()) { +uri_ = value; +logger_->log_info("AbstractMQTTProcessor: BrokerURL [%s]", uri_); + } + value = ""; + if (context->getProperty(ClientID.getName(), value) && !value.empty()) { +clientID_ = value; +logger_->log_info("AbstractMQTTProcessor: ClientID [%s]", clientID_); + } + value = ""; + if (context->getProperty
[GitHub] nifi-minifi-cpp pull request #228: MINIFICPP-342: MQTT extension
Github user minifirocks commented on a diff in the pull request: https://github.com/apache/nifi-minifi-cpp/pull/228#discussion_r159804421 --- Diff: extensions/mqtt/AbstractMQTTProcessor.cpp --- @@ -0,0 +1,158 @@ +/** + * @file AbstractMQTTProcessor.cpp + * AbstractMQTTProcessor class implementation + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "AbstractMQTTProcessor.h" +#include +#include +#include +#include "utils/TimeUtil.h" +#include "utils/StringUtils.h" +#include "core/ProcessContext.h" +#include "core/ProcessSession.h" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace processors { + +core::Property AbstractMQTTProcessor::BrokerURL("Broker URI", "The URI to use to connect to the MQTT broker", ""); +core::Property AbstractMQTTProcessor::CleanSession("Session state", "Whether to start afresh or resume previous flows. See the allowable value descriptions for more details", "true"); +core::Property AbstractMQTTProcessor::ClientID("Client ID", "MQTT client ID to use", ""); +core::Property AbstractMQTTProcessor::UserName("Username", "Username to use when connecting to the broker", ""); +core::Property AbstractMQTTProcessor::PassWord("Password", "Password to use when connecting to the broker", ""); +core::Property AbstractMQTTProcessor::KeepLiveInterval("Keep Alive Interval", "Defines the maximum time interval between messages sent or received", "60 sec"); +core::Property AbstractMQTTProcessor::ConnectionTimeOut("Connection Timeout", "Maximum time interval the client will wait for the network connection to the MQTT server", "30 sec"); +core::Property AbstractMQTTProcessor::QOS("Quality of Service", "The Quality of Service(QoS) to send the message with. Accepts three values '0', '1' and '2'", "MQTT_QOS_0"); +core::Property AbstractMQTTProcessor::Topic("Topic", "The topic to publish the message to", ""); +core::Relationship AbstractMQTTProcessor::Success("success", "FlowFiles that are sent successfully to the destination are transferred to this relationship"); +core::Relationship AbstractMQTTProcessor::Failure("failure", "FlowFiles that failed to send to the destination are transferred to this relationship"); + +void AbstractMQTTProcessor::initialize() { + // Set the supported properties + std::set properties; + properties.insert(BrokerURL); + properties.insert(CleanSession); + properties.insert(ClientID); + properties.insert(UserName); + properties.insert(PassWord); + properties.insert(KeepLiveInterval); + properties.insert(ConnectionTimeOut); + properties.insert(QOS); + properties.insert(Topic); + setSupportedProperties(properties); + // Set the supported relationships + std::set relationships; + relationships.insert(Success); + relationships.insert(Failure); + setSupportedRelationships(relationships); +} + +void AbstractMQTTProcessor::onSchedule(core::ProcessContext *context, core::ProcessSessionFactory *sessionFactory) { + std::string value; + int64_t valInt; + value = ""; + if (context->getProperty(BrokerURL.getName(), value) && !value.empty()) { +uri_ = value; +logger_->log_info("AbstractMQTTProcessor: BrokerURL [%s]", uri_); + } + value = ""; + if (context->getProperty(ClientID.getName(), value) && !value.empty()) { +clientID_ = value; +logger_->log_info("AbstractMQTTProcessor: ClientID [%s]", clientID_); + } + value = ""; + if (context->getProperty
[GitHub] nifi-minifi-cpp issue #227: MINIFICPP-355: Resolve issue with 32-bit systems...
Github user minifirocks commented on the issue: https://github.com/apache/nifi-minifi-cpp/pull/227 looks good, can we merge into 1 commit ---
[GitHub] nifi-minifi-cpp pull request #228: MINIFICPP-342: MQTT extension
GitHub user minifirocks opened a pull request: https://github.com/apache/nifi-minifi-cpp/pull/228 MINIFICPP-342: MQTT extension Thank you for submitting a contribution to Apache NiFi - MiNiFi C++. In order to streamline the review of the contribution we ask you to ensure the following steps have been taken: ### For all changes: - [ ] Is there a JIRA ticket associated with this PR? Is it referenced in the commit message? - [ ] Does your PR title start with MINIFI- where is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character. - [ ] Has your PR been rebased against the latest commit within the target branch (typically master)? - [ ] Is your initial contribution a single, squashed commit? ### For code changes: - [ ] If adding new dependencies to the code, are these dependencies licensed in a way that is compatible for inclusion under [ASF 2.0](http://www.apache.org/legal/resolved.html#category-a)? - [ ] If applicable, have you updated the LICENSE file? - [ ] If applicable, have you updated the NOTICE file? ### For documentation related changes: - [ ] Have you ensured that format looks appropriate for the output in which it is rendered? ### Note: Please ensure that once the PR is submitted, you check travis-ci for build issues and submit an update to your PR as soon as possible. You can merge this pull request into a Git repository by running: $ git pull https://github.com/minifirocks/nifi-minifi-cpp mqtt_dev Alternatively you can review and apply these changes as the patch at: https://github.com/apache/nifi-minifi-cpp/pull/228.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #228 commit cc47483e04164102b2382a78bf68d10f4e8f5efe Author: Bin Qiu <benqiu2016@...> Date: 2018-01-04T16:23:14Z MINIFICPP-342: MQTT extension ---
[GitHub] nifi-minifi-cpp issue #217: MINIFICPP-41: First iteration of C api
Github user minifirocks commented on the issue: https://github.com/apache/nifi-minifi-cpp/pull/217 yes, +1 ---
[GitHub] nifi-minifi-cpp issue #216: MINIFICPP-341 Add CentOS6 build instructions
Github user minifirocks commented on the issue: https://github.com/apache/nifi-minifi-cpp/pull/216 please resolve conflict ---
[GitHub] nifi-minifi-cpp issue #217: MINIFICPP-41: First iteration of C api
Github user minifirocks commented on the issue: https://github.com/apache/nifi-minifi-cpp/pull/217 please merge these two commit ---
[GitHub] nifi-minifi-cpp issue #217: MINIFICPP-41: First iteration of C api
Github user minifirocks commented on the issue: https://github.com/apache/nifi-minifi-cpp/pull/217 looks good ---
[GitHub] nifi-minifi-cpp pull request #217: MINIFICPP-41: First iteration of C api
Github user minifirocks commented on a diff in the pull request: https://github.com/apache/nifi-minifi-cpp/pull/217#discussion_r156161507 --- Diff: libminifi/src/capi/api.cpp --- @@ -0,0 +1,251 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include +#include +#include +#include +#include "core/Core.h" +#include "capi/api.h" +#include "capi/expect.h" +#include "capi/Instance.h" +#include "capi/Plan.h" +#include "ResourceClaim.h" + +/** + * Creates a NiFi Instance from the url and output port. + * @param url http URL for NiFi instance + * @param port Remote output port. + */ +nifi_instance *create_instance(char *url, nifi_port *port) { + minifi::setDefaultDirectory(DEFAULT_CONTENT_DIRECTORY); + nifi_instance *instance = new nifi_instance; + + instance->instance_ptr = new minifi::Instance(url, port->pord_id); + instance->port.pord_id = port->pord_id; + + return instance; +} + +/** + * Initializes the instance + */ +void initialize_instance(nifi_instance *instance) { + auto minifi_instance_ref = static_cast<minifi::Instance*>(instance->instance_ptr); + minifi_instance_ref->initialize(instance->port.pord_id); --- End diff -- same as above, we need to init instance just once. ---
[GitHub] nifi-minifi-cpp pull request #217: MINIFICPP-41: First iteration of C api
Github user minifirocks commented on a diff in the pull request: https://github.com/apache/nifi-minifi-cpp/pull/217#discussion_r156161257 --- Diff: libminifi/src/capi/api.cpp --- @@ -0,0 +1,251 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include +#include +#include +#include +#include "core/Core.h" +#include "capi/api.h" +#include "capi/expect.h" +#include "capi/Instance.h" +#include "capi/Plan.h" +#include "ResourceClaim.h" + +/** + * Creates a NiFi Instance from the url and output port. + * @param url http URL for NiFi instance + * @param port Remote output port. + */ +nifi_instance *create_instance(char *url, nifi_port *port) { + minifi::setDefaultDirectory(DEFAULT_CONTENT_DIRECTORY); --- End diff -- is the instance need to be a singleton to avoid user create instance again when try to call the same second time ---
[GitHub] nifi-minifi-cpp issue #188: MINIFICPP-49 Added initial implementation of NiF...
Github user minifirocks commented on the issue: https://github.com/apache/nifi-minifi-cpp/pull/188 @achristianson here is conflict. ---
[GitHub] nifi-minifi-cpp pull request #188: MINIFICPP-49 Added initial implementation...
Github user minifirocks commented on a diff in the pull request: https://github.com/apache/nifi-minifi-cpp/pull/188#discussion_r154135525 --- Diff: extensions/expression-language/ProcessContextExpr.cpp --- @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace core { + +bool ProcessContext::getProperty(const std::string , std::string , + const std::shared_ptr _file) { + if (expressions_.find(name) == expressions_.end()) { --- End diff -- OK. ---
[GitHub] nifi-minifi-cpp issue #188: MINIFICPP-49 Added initial implementation of NiF...
Github user minifirocks commented on the issue: https://github.com/apache/nifi-minifi-cpp/pull/188 looks good to me +1 ---
[GitHub] nifi-minifi-cpp pull request #188: MINIFICPP-49 Added initial implementation...
Github user minifirocks commented on a diff in the pull request: https://github.com/apache/nifi-minifi-cpp/pull/188#discussion_r153617015 --- Diff: extensions/expression-language/ProcessContextExpr.cpp --- @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace core { + +bool ProcessContext::getProperty(const std::string , std::string , + const std::shared_ptr _file) { + if (expressions_.find(name) == expressions_.end()) { --- End diff -- I did not see the code path for EL for property handling. it will make sense to change README to illustrate the example use case for EL ---
[GitHub] nifi-minifi-cpp pull request #209: MINIFICPP-329: Change PutKafka to Publish...
GitHub user minifirocks opened a pull request: https://github.com/apache/nifi-minifi-cpp/pull/209 MINIFICPP-329: Change PutKafka to PublishKafka Thank you for submitting a contribution to Apache NiFi - MiNiFi C++. In order to streamline the review of the contribution we ask you to ensure the following steps have been taken: ### For all changes: - [ ] Is there a JIRA ticket associated with this PR? Is it referenced in the commit message? - [ ] Does your PR title start with MINIFI- where is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character. - [ ] Has your PR been rebased against the latest commit within the target branch (typically master)? - [ ] Is your initial contribution a single, squashed commit? ### For code changes: - [ ] If adding new dependencies to the code, are these dependencies licensed in a way that is compatible for inclusion under [ASF 2.0](http://www.apache.org/legal/resolved.html#category-a)? - [ ] If applicable, have you updated the LICENSE file? - [ ] If applicable, have you updated the NOTICE file? ### For documentation related changes: - [ ] Have you ensured that format looks appropriate for the output in which it is rendered? ### Note: Please ensure that once the PR is submitted, you check travis-ci for build issues and submit an update to your PR as soon as possible. You can merge this pull request into a Git repository by running: $ git pull https://github.com/minifirocks/nifi-minifi-cpp kafka_refactor Alternatively you can review and apply these changes as the patch at: https://github.com/apache/nifi-minifi-cpp/pull/209.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #209 commit 2bcec3a815077cd330814a9a07f64d8ada01e90a Author: Bin Qiu <benqiu2...@gmail.com> Date: 2017-11-28T15:25:15Z MINIFICPP-329: Change PutKafka to PublishKafka ---
[GitHub] nifi-minifi-cpp issue #183: MINIFICPP-301 Added initial implementation of TF...
Github user minifirocks commented on the issue: https://github.com/apache/nifi-minifi-cpp/pull/183 @achristianson please merge to one commit ---
[GitHub] nifi-minifi-cpp issue #182: MINIFICPP-274: PutKafka Processor
Github user minifirocks commented on the issue: https://github.com/apache/nifi-minifi-cpp/pull/182 @phrocker addressed the review comments. ---
[GitHub] nifi-minifi-cpp pull request #183: MINIFICPP-301 Added initial implementatio...
Github user minifirocks commented on a diff in the pull request: https://github.com/apache/nifi-minifi-cpp/pull/183#discussion_r150886943 --- Diff: extensions/tensorflow/TFApplyGraph.cpp --- @@ -0,0 +1,227 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "TFApplyGraph.h" + +#include "tensorflow/cc/ops/standard_ops.h" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace processors { + +core::Property TFApplyGraph::InputNode( // NOLINT +"Input Node", +"The node of the TensorFlow graph to feed tensor inputs to", ""); +core::Property TFApplyGraph::OutputNode( // NOLINT +"Output Node", +"The node of the TensorFlow graph to read tensor outputs from", ""); + +core::Relationship TFApplyGraph::Success( // NOLINT +"success", +"Successful graph application outputs"); +core::Relationship TFApplyGraph::Retry( // NOLINT +"retry", +"Inputs which fail graph application but may work if sent again"); +core::Relationship TFApplyGraph::Failure( // NOLINT +"failure", +"Failures which will not work if retried"); + +void TFApplyGraph::initialize() { + std::set properties; + properties.insert(InputNode); + properties.insert(OutputNode); + setSupportedProperties(std::move(properties)); + + std::set relationships; + relationships.insert(Success); + relationships.insert(Retry); + relationships.insert(Failure); + setSupportedRelationships(std::move(relationships)); +} + +void TFApplyGraph::onSchedule(core::ProcessContext *context, core::ProcessSessionFactory *sessionFactory) { + context->getProperty(InputNode.getName(), input_node_); + + if (input_node_.empty()) { +logger_->log_error("Invalid input node"); + } + + context->getProperty(OutputNode.getName(), output_node_); + + if (output_node_.empty()) { +logger_->log_error("Invalid output node"); + } +} + +void TFApplyGraph::onTrigger(const std::shared_ptr , + const std::shared_ptr ) { + auto flow_file = session->get(); --- End diff -- we overwrite the flow input with output, do we have a use case that we need to keep input and create a new flow for output. ---
[GitHub] nifi-minifi-cpp pull request #183: MINIFICPP-301 Added initial implementatio...
Github user minifirocks commented on a diff in the pull request: https://github.com/apache/nifi-minifi-cpp/pull/183#discussion_r150886217 --- Diff: extensions/tensorflow/TFApplyGraph.h --- @@ -0,0 +1,117 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef NIFI_MINIFI_CPP_TFAPPLYGRAPH_H +#define NIFI_MINIFI_CPP_TFAPPLYGRAPH_H + +#include + +#include +#include +#include +#include + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace processors { + +class TFApplyGraph : public core::Processor { + public: + explicit TFApplyGraph(const std::string , uuid_t uuid = nullptr) + : Processor(name, uuid), +logger_(logging::LoggerFactory::getLogger()) { + } + + static void hello(); --- End diff -- unused hello ---
[GitHub] nifi-minifi-cpp pull request #183: MINIFICPP-301 Added initial implementatio...
Github user minifirocks commented on a diff in the pull request: https://github.com/apache/nifi-minifi-cpp/pull/183#discussion_r150878519 --- Diff: extensions/tensorflow/TFApplyGraph.cpp --- @@ -0,0 +1,227 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "TFApplyGraph.h" + +#include "tensorflow/cc/ops/standard_ops.h" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace processors { + +core::Property TFApplyGraph::InputNode( // NOLINT +"Input Node", +"The node of the TensorFlow graph to feed tensor inputs to", ""); +core::Property TFApplyGraph::OutputNode( // NOLINT +"Output Node", +"The node of the TensorFlow graph to read tensor outputs from", ""); + +core::Relationship TFApplyGraph::Success( // NOLINT +"success", +"Successful graph application outputs"); +core::Relationship TFApplyGraph::Retry( // NOLINT +"retry", +"Inputs which fail graph application but may work if sent again"); +core::Relationship TFApplyGraph::Failure( // NOLINT +"failure", +"Failures which will not work if retried"); + +void TFApplyGraph::initialize() { + std::set properties; + properties.insert(InputNode); + properties.insert(OutputNode); + setSupportedProperties(std::move(properties)); + + std::set relationships; + relationships.insert(Success); + relationships.insert(Retry); + relationships.insert(Failure); + setSupportedRelationships(std::move(relationships)); +} + +void TFApplyGraph::onSchedule(core::ProcessContext *context, core::ProcessSessionFactory *sessionFactory) { + context->getProperty(InputNode.getName(), input_node_); + + if (input_node_.empty()) { +logger_->log_error("Invalid input node"); + } + + context->getProperty(OutputNode.getName(), output_node_); + + if (output_node_.empty()) { +logger_->log_error("Invalid output node"); + } +} + +void TFApplyGraph::onTrigger(const std::shared_ptr , + const std::shared_ptr ) { + auto flow_file = session->get(); + + if (!flow_file) { +return; + } + + try { +// Read graph +std::string tf_type; +flow_file->getAttribute("tf.type", tf_type); + +std::shared_ptr graph_def; +uint32_t graph_version; + +{ + std::lock_guard guard(graph_def_mtx_); + + if ("graph" == tf_type) { --- End diff -- after we reload the graph, do we need to clean the context which associated with the old graph ---
[GitHub] nifi-minifi-cpp pull request #181: MINIFICPP-297: Remove Boost dependencies ...
Github user minifirocks commented on a diff in the pull request: https://github.com/apache/nifi-minifi-cpp/pull/181#discussion_r150865974 --- Diff: libminifi/include/utils/file/FileManager.h --- @@ -0,0 +1,113 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef LIBMINIFI_INCLUDE_UTILS_FILEMANAGER_H_ +#define LIBMINIFI_INCLUDE_UTILS_FILEMANAGER_H_ + +#ifdef BOOST_VERSION +#include +#else +#include +#endif +#include +#include +#include +#include "io/validation.h" +#include "utils/Id.h" +#include "utils/StringUtils.h" +#ifdef WIN32 +#define stat _stat +#endif + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace utils { +namespace file { + +/** + * Simple implementation of simple file manager utilities. + * + * unique_file is not a static implementation so that we can support scope driven temporary files. + */ +class FileManager { + public: + + FileManager() { + } + + ~FileManager() { +for (auto file : unique_files_) { + unlink(file.c_str()); +} + } + std::string unique_file(const std::string , bool keep = false) { + +if (!IsNullOrEmpty(location)) { + std::string file_name = location + "/" + non_repeating_string_generator_.generate(); + while (!verify_not_exist(file_name)) { +file_name = location + "/" + non_repeating_string_generator_.generate(); + } + if (!keep) +unique_files_.push_back(file_name); --- End diff -- why we need to do unlink later in the destructor ---
[GitHub] nifi-minifi-cpp pull request #183: MINIFICPP-301 Added initial implementatio...
Github user minifirocks commented on a diff in the pull request: https://github.com/apache/nifi-minifi-cpp/pull/183#discussion_r150858096 --- Diff: CMakeLists.txt --- @@ -155,6 +155,13 @@ if (ENABLE_USB_CAMERA) createExtension(USB-CAMERA-EXTENSIONS "USB CAMERA EXTENSIONS" "This enables USB camera support" "extensions/usb-camera" "${TEST_DIR}/usb-camera-tests" "TRUE" "thirdparty/libuvc-0.0.6") endif() +## TensorFlow extensions +## Disabled by default because TF can be complex/environment-specific to build +option(ENABLE_TENSORFLOW "Disables the TensorFlow extensions." OFF) +if (ENABLE_TENSORFLOW) +createExtension(TENSORFLOW-EXTENSIONS "TENSORFLOW EXTENSIONS" "This enables TensorFlow support" "extensions/tensorflow" "${TEST_DIR}/tensorflow-tests") --- End diff -- does TF provide source built as third party library. in case we want to build for other target like ARM, etc ---
[GitHub] nifi-minifi-cpp pull request #182: MINIFICPP-274: PutKafka Processor
GitHub user minifirocks opened a pull request: https://github.com/apache/nifi-minifi-cpp/pull/182 MINIFICPP-274: PutKafka Processor Thank you for submitting a contribution to Apache NiFi - MiNiFi C++. In order to streamline the review of the contribution we ask you to ensure the following steps have been taken: ### For all changes: - [ ] Is there a JIRA ticket associated with this PR? Is it referenced in the commit message? - [ ] Does your PR title start with MINIFI- where is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character. - [ ] Has your PR been rebased against the latest commit within the target branch (typically master)? - [ ] Is your initial contribution a single, squashed commit? ### For code changes: - [ ] If adding new dependencies to the code, are these dependencies licensed in a way that is compatible for inclusion under [ASF 2.0](http://www.apache.org/legal/resolved.html#category-a)? - [ ] If applicable, have you updated the LICENSE file? - [ ] If applicable, have you updated the NOTICE file? ### For documentation related changes: - [ ] Have you ensured that format looks appropriate for the output in which it is rendered? ### Note: Please ensure that once the PR is submitted, you check travis-ci for build issues and submit an update to your PR as soon as possible. You can merge this pull request into a Git repository by running: $ git pull https://github.com/minifirocks/nifi-minifi-cpp kafka Alternatively you can review and apply these changes as the patch at: https://github.com/apache/nifi-minifi-cpp/pull/182.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #182 commit a2abdfeaffaf86ea9dbddae5193988e956829618 Author: Bin Qiu <benqiu2...@gmail.com> Date: 2017-11-10T16:50:04Z MINIFICPP-274: PutKafka Processor ---
[GitHub] nifi-minifi-cpp issue #172: MINIFI-218: GetGPS processor implementation
Github user minifirocks commented on the issue: https://github.com/apache/nifi-minifi-cpp/pull/172 @phrocker looks good, please merge the two commits into one. ---
[GitHub] nifi-minifi-cpp issue #173: MINIFICPP-284 Handle processor configuration err...
Github user minifirocks commented on the issue: https://github.com/apache/nifi-minifi-cpp/pull/173 looks good. ---
[GitHub] nifi-minifi-cpp issue #158: MINIFICPP-60: Add initial implementation of Site...
Github user minifirocks commented on the issue: https://github.com/apache/nifi-minifi-cpp/pull/158 @phrocker please resolve the conflict and merge into simple commit, update readme for the same. ---
[GitHub] nifi-minifi-cpp pull request #162: MINIFICPP-273 Simplify configuration of M...
Github user minifirocks commented on a diff in the pull request: https://github.com/apache/nifi-minifi-cpp/pull/162#discussion_r147889510 --- Diff: libminifi/src/core/yaml/YamlConfiguration.cpp --- @@ -698,6 +703,34 @@ void YamlConfiguration::checkRequiredField(YAML::Node *yamlNode, const std::stri } } +YAML::Node YamlConfiguration::getOptionalField(YAML::Node *yamlNode, + const std::string , + const YAML::Node , + const std::string , + const std::string ) { + std::string infoMessage = providedInfoMessage; + auto result = yamlNode->as()[fieldName]; + if (!result) { +if (infoMessage.empty()) { + // Build a helpful info message for the user to inform them that a default is being used + infoMessage = + yamlNode->as()["name"] ? + "Using default value for optional field '" + fieldName + "' in component named '" + + yamlNode->as()["name"].as() + "'" : + "Using default value for optional field '" + fieldName + "' "; + if (!yamlSection.empty()) { +infoMessage += " [in '" + yamlSection + "' section of configuration file]: "; + } + + infoMessage += defaultValue.as(); +} +logger_->log_info(infoMessage.c_str()); +result = defaultValue; --- End diff -- no return of the result. please run make linter ---
[GitHub] nifi-minifi-cpp pull request #158: MINIFICPP-60: Add initial implementation ...
Github user minifirocks commented on a diff in the pull request: https://github.com/apache/nifi-minifi-cpp/pull/158#discussion_r147837447 --- Diff: extensions/http-curl/sitetosite/HTTPProtocol.cpp --- @@ -0,0 +1,312 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "HTTPProtocol.h" + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "PeersEntity.h" +#include "io/CRCStream.h" +#include "sitetosite/Peer.h" +#include "io/validation.h" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace sitetosite { + +std::shared_ptr HttpSiteToSiteClient::id_generator_ = utils::IdGenerator::getIdGenerator(); + +const std::string HttpSiteToSiteClient::parseTransactionId(const std::string ) { + int i = 0; + for (i = uri.length() - 1; i >= 0; i--) { +if (uri.at(i) == '/') + break; + } + return uri.substr(i + 1, uri.length() - (i + 1)); +} + +std::shared_ptr HttpSiteToSiteClient::createTransaction(std::string , TransferDirection direction) { + std::string dir_str = direction == SEND ? "input-ports" : "output-ports"; + std::stringstream uri; + uri << getBaseURI() << "data-transfer/" << dir_str << "/" << getPortId() << "/transactions"; + auto client = create_http_client(uri.str(), "POST"); + + client->appendHeader(PROTOCOL_VERSION_HEADER, "1"); + + client->setConnectionTimeout(5); + + client->setContentType("application/json"); + client->appendHeader("Accept: application/json"); + client->setUseChunkedEncoding(); + client->setPostFields(""); + client->submit(); + peer_->setStream(nullptr); + if (client->getResponseCode() == 201) { +// parse the headers +auto headers = client->getParsedHeaders(); +auto intent_name = headers["x-location-uri-intent"]; +if (intent_name == "transaction-url") { + auto url = headers["Location"]; + + if (IsNullOrEmpty()) { +logger_->log_debug("Location is empty"); + } else { + +org::apache::nifi::minifi::io::CRCStream crcstream(peer_.get()); +auto transaction = std::make_shared(direction, crcstream); +transaction->initialize(this, url); +auto transactionId = parseTransactionId(url); +if (IsNullOrEmpty(transactionId)) + return nullptr; +transaction->setTransactionId(transactionId); +std::shared_ptr client; +if (transaction->getDirection() == SEND) { + client = openConnectionForSending(transaction); +} else { + client = openConnectionForReceive(transaction); + transaction->setDataAvailable(true); + // a 201 tells us that data is available. A 200 would mean that nothing is available. +} + +client->appendHeader(PROTOCOL_VERSION_HEADER, "1"); +peer_->setStream(std::unique_ptr(new io::HttpStream(client))); +transactionID = transaction->getUUIDStr(); +logger_->log_debug("Created transaction id -%s-", transactionID); +known_transactions_[transaction->getUUIDStr()] = transaction; +return transaction; + } +} else { + logger_->log_debug("Could not create transaction, intent is %s", intent_name); +} + } else { +logger_->log_debug("Could not create transaction, received %d", client->getResponseCode()); + } + return nullptr; +} + +int HttpSiteToSi
[GitHub] nifi-minifi-cpp pull request #158: MINIFICPP-60: Add initial implementation ...
Github user minifirocks commented on a diff in the pull request: https://github.com/apache/nifi-minifi-cpp/pull/158#discussion_r147837310 --- Diff: extensions/http-curl/sitetosite/HTTPProtocol.cpp --- @@ -0,0 +1,312 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "HTTPProtocol.h" + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "PeersEntity.h" +#include "io/CRCStream.h" +#include "sitetosite/Peer.h" +#include "io/validation.h" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace sitetosite { + +std::shared_ptr HttpSiteToSiteClient::id_generator_ = utils::IdGenerator::getIdGenerator(); + +const std::string HttpSiteToSiteClient::parseTransactionId(const std::string ) { + int i = 0; + for (i = uri.length() - 1; i >= 0; i--) { +if (uri.at(i) == '/') + break; + } + return uri.substr(i + 1, uri.length() - (i + 1)); +} + +std::shared_ptr HttpSiteToSiteClient::createTransaction(std::string , TransferDirection direction) { + std::string dir_str = direction == SEND ? "input-ports" : "output-ports"; + std::stringstream uri; + uri << getBaseURI() << "data-transfer/" << dir_str << "/" << getPortId() << "/transactions"; + auto client = create_http_client(uri.str(), "POST"); + + client->appendHeader(PROTOCOL_VERSION_HEADER, "1"); + + client->setConnectionTimeout(5); + + client->setContentType("application/json"); + client->appendHeader("Accept: application/json"); + client->setUseChunkedEncoding(); + client->setPostFields(""); + client->submit(); + peer_->setStream(nullptr); + if (client->getResponseCode() == 201) { +// parse the headers +auto headers = client->getParsedHeaders(); +auto intent_name = headers["x-location-uri-intent"]; +if (intent_name == "transaction-url") { + auto url = headers["Location"]; + + if (IsNullOrEmpty()) { +logger_->log_debug("Location is empty"); + } else { + +org::apache::nifi::minifi::io::CRCStream crcstream(peer_.get()); +auto transaction = std::make_shared(direction, crcstream); +transaction->initialize(this, url); +auto transactionId = parseTransactionId(url); +if (IsNullOrEmpty(transactionId)) + return nullptr; +transaction->setTransactionId(transactionId); +std::shared_ptr client; +if (transaction->getDirection() == SEND) { + client = openConnectionForSending(transaction); +} else { + client = openConnectionForReceive(transaction); + transaction->setDataAvailable(true); + // a 201 tells us that data is available. A 200 would mean that nothing is available. +} + +client->appendHeader(PROTOCOL_VERSION_HEADER, "1"); +peer_->setStream(std::unique_ptr(new io::HttpStream(client))); +transactionID = transaction->getUUIDStr(); +logger_->log_debug("Created transaction id -%s-", transactionID); +known_transactions_[transaction->getUUIDStr()] = transaction; +return transaction; + } +} else { + logger_->log_debug("Could not create transaction, intent is %s", intent_name); +} + } else { +logger_->log_debug("Could not create transaction, received %d", client->getResponseCode()); + } + return nullptr; +} + +int HttpSiteToSi
[GitHub] nifi-minifi-cpp pull request #158: MINIFICPP-60: Add initial implementation ...
Github user minifirocks commented on a diff in the pull request: https://github.com/apache/nifi-minifi-cpp/pull/158#discussion_r147837119 --- Diff: extensions/http-curl/sitetosite/HTTPProtocol.cpp --- @@ -0,0 +1,312 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "HTTPProtocol.h" + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "PeersEntity.h" +#include "io/CRCStream.h" +#include "sitetosite/Peer.h" +#include "io/validation.h" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace sitetosite { + +std::shared_ptr HttpSiteToSiteClient::id_generator_ = utils::IdGenerator::getIdGenerator(); + +const std::string HttpSiteToSiteClient::parseTransactionId(const std::string ) { + int i = 0; + for (i = uri.length() - 1; i >= 0; i--) { +if (uri.at(i) == '/') + break; + } + return uri.substr(i + 1, uri.length() - (i + 1)); +} + +std::shared_ptr HttpSiteToSiteClient::createTransaction(std::string , TransferDirection direction) { + std::string dir_str = direction == SEND ? "input-ports" : "output-ports"; + std::stringstream uri; + uri << getBaseURI() << "data-transfer/" << dir_str << "/" << getPortId() << "/transactions"; + auto client = create_http_client(uri.str(), "POST"); + + client->appendHeader(PROTOCOL_VERSION_HEADER, "1"); + + client->setConnectionTimeout(5); + + client->setContentType("application/json"); + client->appendHeader("Accept: application/json"); + client->setUseChunkedEncoding(); + client->setPostFields(""); + client->submit(); + peer_->setStream(nullptr); + if (client->getResponseCode() == 201) { +// parse the headers +auto headers = client->getParsedHeaders(); +auto intent_name = headers["x-location-uri-intent"]; +if (intent_name == "transaction-url") { + auto url = headers["Location"]; + + if (IsNullOrEmpty()) { +logger_->log_debug("Location is empty"); + } else { + +org::apache::nifi::minifi::io::CRCStream crcstream(peer_.get()); +auto transaction = std::make_shared(direction, crcstream); +transaction->initialize(this, url); +auto transactionId = parseTransactionId(url); +if (IsNullOrEmpty(transactionId)) + return nullptr; +transaction->setTransactionId(transactionId); +std::shared_ptr client; +if (transaction->getDirection() == SEND) { + client = openConnectionForSending(transaction); +} else { + client = openConnectionForReceive(transaction); + transaction->setDataAvailable(true); + // a 201 tells us that data is available. A 200 would mean that nothing is available. +} + +client->appendHeader(PROTOCOL_VERSION_HEADER, "1"); +peer_->setStream(std::unique_ptr(new io::HttpStream(client))); +transactionID = transaction->getUUIDStr(); +logger_->log_debug("Created transaction id -%s-", transactionID); +known_transactions_[transaction->getUUIDStr()] = transaction; +return transaction; + } +} else { + logger_->log_debug("Could not create transaction, intent is %s", intent_name); +} + } else { +logger_->log_debug("Could not create transaction, received %d", client->getResponseCode()); + } + return nullptr; +} + +int HttpSiteToSi
[GitHub] nifi-minifi-cpp pull request #158: MINIFICPP-60: Add initial implementation ...
Github user minifirocks commented on a diff in the pull request: https://github.com/apache/nifi-minifi-cpp/pull/158#discussion_r147747069 --- Diff: extensions/http-curl/client/HTTPCallback.h --- @@ -0,0 +1,187 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef EXTENSIONS_HTTP_CURL_CLIENT_HTTPCALLBACK_H_ +#define EXTENSIONS_HTTP_CURL_CLIENT_HTTPCALLBACK_H_ + +#include "concurrentqueue.h" +#include +#include +#include +#include + +#include "utils/ByteArrayCallback.h" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace utils { + +/** + * will stream as items are processed. + */ +class HttpStreamingCallback : public ByteInputCallBack { + public: + HttpStreamingCallback() + : ptr(nullptr), +is_alive_(true) { +previous_pos_ = 0; +rolling_count_ = 0; + } + + virtual ~HttpStreamingCallback() { + + } + + void close() { +is_alive_ = false; +cv.notify_all(); + } + + virtual void seek(size_t pos) { +if ((pos - previous_pos_) >= current_vec_.size() || current_vec_.size() == 0) + load_buffer(); + } + + virtual int64_t process(std::shared_ptr stream) { + +std::vector vec; + +if (stream->getSize() > 0) { + vec.resize(stream->getSize()); + + stream->readData(reinterpret_cast<uint8_t*>(vec.data()), stream->getSize()); +} + +size_t added_size = vec.size(); + +byte_arrays_.enqueue(std::move(vec)); + +cv.notify_all(); + +return added_size; + + } + + virtual int64_t process(uint8_t *vector, size_t size) { + +std::vector vec; + +if (size > 0) { + vec.resize(size); + + memcpy(vec.data(), vector, size); + + size_t added_size = vec.size(); + + byte_arrays_.enqueue(std::move(vec)); + + cv.notify_all(); + + return added_size; +} else { + return 0; +} + + } + + virtual void write(std::string content) { +std::vector vec; +vec.assign(content.begin(), content.end()); +byte_arrays_.enqueue(vec); + } + + virtual char *getBuffer(size_t pos) { + +// if there is no space remaining in our current buffer, +// we should load the next. If none exists after that we have no more buffer +std::lock_guard lock(mutex_); + +if ((pos - previous_pos_) >= current_vec_.size() || current_vec_.size() == 0) + load_buffer(); + +if (ptr == nullptr) + return nullptr; + +size_t absolute_position = pos - previous_pos_; + +current_pos_ = pos; +for (int i = 0; i < current_vec_.size(); i++) { +} + +return ptr + absolute_position; + } + + virtual const size_t getRemaining(size_t pos) { +return current_vec_.size(); + } + + virtual const size_t getBufferSize() { +std::lock_guard lock(mutex_); + +if (ptr == nullptr || current_pos_ >= rolling_count_) { + load_buffer(); +} +return rolling_count_; + } + + private: + + inline void load_buffer() { +std::unique_lock lock(mutex_); +cv.wait(lock, [&] {return byte_arrays_.size_approx() > 0 || is_alive_==false;}); +if (!is_alive_ && byte_arrays_.size_approx() == 0) { + return; --- End diff -- i thought after wait fired, we own the lock, that's reason you did unlock afterwards. ---
[GitHub] nifi-minifi-cpp pull request #158: MINIFICPP-60: Add initial implementation ...
Github user minifirocks commented on a diff in the pull request: https://github.com/apache/nifi-minifi-cpp/pull/158#discussion_r147739186 --- Diff: extensions/http-curl/client/HTTPCallback.h --- @@ -0,0 +1,187 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef EXTENSIONS_HTTP_CURL_CLIENT_HTTPCALLBACK_H_ +#define EXTENSIONS_HTTP_CURL_CLIENT_HTTPCALLBACK_H_ + +#include "concurrentqueue.h" +#include +#include +#include +#include + +#include "utils/ByteArrayCallback.h" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace utils { + +/** + * will stream as items are processed. + */ +class HttpStreamingCallback : public ByteInputCallBack { + public: + HttpStreamingCallback() + : ptr(nullptr), +is_alive_(true) { +previous_pos_ = 0; +rolling_count_ = 0; + } + + virtual ~HttpStreamingCallback() { + + } + + void close() { +is_alive_ = false; +cv.notify_all(); + } + + virtual void seek(size_t pos) { +if ((pos - previous_pos_) >= current_vec_.size() || current_vec_.size() == 0) + load_buffer(); + } + + virtual int64_t process(std::shared_ptr stream) { + +std::vector vec; + +if (stream->getSize() > 0) { + vec.resize(stream->getSize()); + + stream->readData(reinterpret_cast<uint8_t*>(vec.data()), stream->getSize()); +} + +size_t added_size = vec.size(); + +byte_arrays_.enqueue(std::move(vec)); + +cv.notify_all(); + +return added_size; + + } + + virtual int64_t process(uint8_t *vector, size_t size) { + +std::vector vec; + +if (size > 0) { + vec.resize(size); + + memcpy(vec.data(), vector, size); + + size_t added_size = vec.size(); + + byte_arrays_.enqueue(std::move(vec)); + + cv.notify_all(); + + return added_size; +} else { + return 0; +} + + } + + virtual void write(std::string content) { +std::vector vec; +vec.assign(content.begin(), content.end()); +byte_arrays_.enqueue(vec); + } + + virtual char *getBuffer(size_t pos) { + +// if there is no space remaining in our current buffer, +// we should load the next. If none exists after that we have no more buffer +std::lock_guard lock(mutex_); + +if ((pos - previous_pos_) >= current_vec_.size() || current_vec_.size() == 0) + load_buffer(); + +if (ptr == nullptr) + return nullptr; + +size_t absolute_position = pos - previous_pos_; + +current_pos_ = pos; +for (int i = 0; i < current_vec_.size(); i++) { +} + +return ptr + absolute_position; + } + + virtual const size_t getRemaining(size_t pos) { +return current_vec_.size(); + } + + virtual const size_t getBufferSize() { +std::lock_guard lock(mutex_); + +if (ptr == nullptr || current_pos_ >= rolling_count_) { + load_buffer(); +} +return rolling_count_; + } + + private: + + inline void load_buffer() { +std::unique_lock lock(mutex_); +cv.wait(lock, [&] {return byte_arrays_.size_approx() > 0 || is_alive_==false;}); +if (!is_alive_ && byte_arrays_.size_approx() == 0) { + return; --- End diff -- unlock the mutex before return? ---
[GitHub] nifi-minifi-cpp pull request #158: MINIFICPP-60: Add initial implementation ...
Github user minifirocks commented on a diff in the pull request: https://github.com/apache/nifi-minifi-cpp/pull/158#discussion_r147738287 --- Diff: extensions/http-curl/client/HTTPCallback.h --- @@ -0,0 +1,187 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef EXTENSIONS_HTTP_CURL_CLIENT_HTTPCALLBACK_H_ +#define EXTENSIONS_HTTP_CURL_CLIENT_HTTPCALLBACK_H_ + +#include "concurrentqueue.h" +#include +#include +#include +#include + +#include "utils/ByteArrayCallback.h" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace utils { + +/** + * will stream as items are processed. + */ +class HttpStreamingCallback : public ByteInputCallBack { + public: + HttpStreamingCallback() + : ptr(nullptr), +is_alive_(true) { +previous_pos_ = 0; +rolling_count_ = 0; + } + + virtual ~HttpStreamingCallback() { + + } + + void close() { +is_alive_ = false; +cv.notify_all(); + } + + virtual void seek(size_t pos) { +if ((pos - previous_pos_) >= current_vec_.size() || current_vec_.size() == 0) + load_buffer(); + } + + virtual int64_t process(std::shared_ptr stream) { + +std::vector vec; + +if (stream->getSize() > 0) { + vec.resize(stream->getSize()); + + stream->readData(reinterpret_cast<uint8_t*>(vec.data()), stream->getSize()); +} + +size_t added_size = vec.size(); + +byte_arrays_.enqueue(std::move(vec)); + +cv.notify_all(); + +return added_size; + + } + + virtual int64_t process(uint8_t *vector, size_t size) { + +std::vector vec; + +if (size > 0) { + vec.resize(size); + + memcpy(vec.data(), vector, size); + + size_t added_size = vec.size(); + + byte_arrays_.enqueue(std::move(vec)); + + cv.notify_all(); + + return added_size; +} else { + return 0; +} + + } + + virtual void write(std::string content) { +std::vector vec; +vec.assign(content.begin(), content.end()); +byte_arrays_.enqueue(vec); --- End diff -- same as above ---
[GitHub] nifi-minifi-cpp pull request #158: MINIFICPP-60: Add initial implementation ...
Github user minifirocks commented on a diff in the pull request: https://github.com/apache/nifi-minifi-cpp/pull/158#discussion_r147738083 --- Diff: extensions/http-curl/client/HTTPCallback.h --- @@ -0,0 +1,187 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef EXTENSIONS_HTTP_CURL_CLIENT_HTTPCALLBACK_H_ +#define EXTENSIONS_HTTP_CURL_CLIENT_HTTPCALLBACK_H_ + +#include "concurrentqueue.h" +#include +#include +#include +#include + +#include "utils/ByteArrayCallback.h" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace utils { + +/** + * will stream as items are processed. + */ +class HttpStreamingCallback : public ByteInputCallBack { + public: + HttpStreamingCallback() + : ptr(nullptr), +is_alive_(true) { +previous_pos_ = 0; +rolling_count_ = 0; + } + + virtual ~HttpStreamingCallback() { + + } + + void close() { +is_alive_ = false; +cv.notify_all(); + } + + virtual void seek(size_t pos) { +if ((pos - previous_pos_) >= current_vec_.size() || current_vec_.size() == 0) + load_buffer(); + } + + virtual int64_t process(std::shared_ptr stream) { + +std::vector vec; + +if (stream->getSize() > 0) { + vec.resize(stream->getSize()); + + stream->readData(reinterpret_cast<uint8_t*>(vec.data()), stream->getSize()); +} + +size_t added_size = vec.size(); + +byte_arrays_.enqueue(std::move(vec)); + +cv.notify_all(); + +return added_size; + + } + + virtual int64_t process(uint8_t *vector, size_t size) { + +std::vector vec; + +if (size > 0) { + vec.resize(size); + + memcpy(vec.data(), vector, size); + + size_t added_size = vec.size(); + + byte_arrays_.enqueue(std::move(vec)); --- End diff -- do we need to lock the mutex_ before enqueue? ---
[GitHub] nifi-minifi-cpp issue #151: MINIFICPP-264: CompressContent Processor
Github user minifirocks commented on the issue: https://github.com/apache/nifi-minifi-cpp/pull/151 refactor to use lib archive extension. ---
[GitHub] nifi-minifi-cpp issue #154: MINIFICPP-265 Disabled fsanitize=address for now...
Github user minifirocks commented on the issue: https://github.com/apache/nifi-minifi-cpp/pull/154 looks good. ---
[GitHub] nifi-minifi-cpp issue #151: MINIFICPP-264: CompressContent Processor
Github user minifirocks commented on the issue: https://github.com/apache/nifi-minifi-cpp/pull/151 @apiri @phrocker fix the test cast, change size and offset signature to uint6_t, it looks like travis passed. please review ---
[GitHub] nifi-minifi-cpp issue #151: MINIFICPP-264: CompressContent Processor
Github user minifirocks commented on the issue: https://github.com/apache/nifi-minifi-cpp/pull/151 @phrocker the basic class is io::BaseStream, the signature is virtual void seek(uint32_t offset) { the uint64_t break the virtual inheritance. CompressContext test is running on in my local, it fail in travis, will debug more ---
[GitHub] nifi-minifi-cpp issue #151: MINIFICPP-264: CompressContent Processor
Github user minifirocks commented on the issue: https://github.com/apache/nifi-minifi-cpp/pull/151 implement the compress content, tested with random 100K payload with format gzip, bzip, lzma, xy-lzma. fix the seek issue in stream. ---
[GitHub] nifi-minifi-cpp pull request #151: MINIFICPP-264: CompressContent Processor
GitHub user minifirocks opened a pull request: https://github.com/apache/nifi-minifi-cpp/pull/151 MINIFICPP-264: CompressContent Processor Thank you for submitting a contribution to Apache NiFi - MiNiFi C++. In order to streamline the review of the contribution we ask you to ensure the following steps have been taken: ### For all changes: - [ ] Is there a JIRA ticket associated with this PR? Is it referenced in the commit message? - [ ] Does your PR title start with MINIFI- where is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character. - [ ] Has your PR been rebased against the latest commit within the target branch (typically master)? - [ ] Is your initial contribution a single, squashed commit? ### For code changes: - [ ] If adding new dependencies to the code, are these dependencies licensed in a way that is compatible for inclusion under [ASF 2.0](http://www.apache.org/legal/resolved.html#category-a)? - [ ] If applicable, have you updated the LICENSE file? - [ ] If applicable, have you updated the NOTICE file? ### For documentation related changes: - [ ] Have you ensured that format looks appropriate for the output in which it is rendered? ### Note: Please ensure that once the PR is submitted, you check travis-ci for build issues and submit an update to your PR as soon as possible. You can merge this pull request into a Git repository by running: $ git pull https://github.com/minifirocks/nifi-minifi-cpp compress_content Alternatively you can review and apply these changes as the patch at: https://github.com/apache/nifi-minifi-cpp/pull/151.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #151 commit a2617f78d26e502112106e0fe92395d41c1c303a Author: Bin Qiu <benqiu2...@gmail.com> Date: 2017-10-24T03:17:01Z MINIFICPP-264: CompressContent Processor ---
[GitHub] nifi-minifi-cpp pull request #148: MINIFI-244 Un/FocusArchive processors
Github user minifirocks commented on a diff in the pull request: https://github.com/apache/nifi-minifi-cpp/pull/148#discussion_r146120896 --- Diff: libminifi/include/core/ProcessSession.h --- @@ -151,11 +152,47 @@ class ProcessSession { bool keepSource, uint64_t offset, char inputDelimiter); + /** + * Exports the data stream to a file + * @param string file to export stream to + * @param flow flow file + * @param bool whether or not to keep the content in the flow file + */ + bool exportContent(const std::string , --- End diff -- the mergeContent.h // Archive Class class ArchiveMerge { public: do not reply on persistent storage, it use archive_write_open(arch, this, NULL, archive_write, NULL); to write the process content in RAM into flowfile. ---
[GitHub] nifi-minifi-cpp pull request #146: MINIFICPP-72: Add Tar and Zip Support for...
Github user minifirocks closed the pull request at: https://github.com/apache/nifi-minifi-cpp/pull/146 ---
[GitHub] nifi-minifi-cpp issue #150: MINIFI-372: Resolve issues with missed commits
Github user minifirocks commented on the issue: https://github.com/apache/nifi-minifi-cpp/pull/150 @phrocker please merge these two commits into one ---
[GitHub] nifi-minifi-cpp issue #150: MINIFI-372: Resolve issues with missed commits
Github user minifirocks commented on the issue: https://github.com/apache/nifi-minifi-cpp/pull/150 @phrocker looks good. ---
[GitHub] nifi-minifi-cpp issue #146: MINIFICPP-72: Add Tar and Zip Support for MergeC...
Github user minifirocks commented on the issue: https://github.com/apache/nifi-minifi-cpp/pull/146 @apiri @phrocker rebased. ---
[GitHub] nifi-minifi-cpp issue #147: MINIFI-256: Resolve Putfile name and ensure that...
Github user minifirocks commented on the issue: https://github.com/apache/nifi-minifi-cpp/pull/147 @phrocker merge to apache, please close the PR. ---
[GitHub] nifi-minifi-cpp issue #147: MINIFI-256: Resolve Putfile name and ensure that...
Github user minifirocks commented on the issue: https://github.com/apache/nifi-minifi-cpp/pull/147 https://github.com/apache/nifi-minifi-cpp/commit/49ed5094552fe98c289d15168587ff3e63042309 ---
[GitHub] nifi-minifi-cpp issue #147: MINIFI-256: Resolve Putfile name and ensure that...
Github user minifirocks commented on the issue: https://github.com/apache/nifi-minifi-cpp/pull/147 @phrocker looks good. ---
[GitHub] nifi-minifi-cpp issue #142: MINIFI-372: Replace leveldb with RocksDB
Github user minifirocks commented on the issue: https://github.com/apache/nifi-minifi-cpp/pull/142 @phrocker overall looks good, please remove the test and doc and unneeded stuff from the rocksdb source. ---
[GitHub] nifi-minifi-cpp pull request #146: MINIFICPP-72: Add Tar and Zip Support for...
Github user minifirocks commented on a diff in the pull request: https://github.com/apache/nifi-minifi-cpp/pull/146#discussion_r144901439 --- Diff: LICENSE --- @@ -564,4 +564,68 @@ ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE --- End diff -- @apiri fix the license ---
[GitHub] nifi-minifi-cpp pull request #146: MINIFICPP-72: Add Tar and Zip Support for...
Github user minifirocks commented on a diff in the pull request: https://github.com/apache/nifi-minifi-cpp/pull/146#discussion_r144900901 --- Diff: libminifi/include/processors/MergeContent.h --- @@ -125,6 +127,127 @@ class BinaryConcatenationMerge : public MergeBin { }; +// Archive Class +class ArchiveMerge { +public: + // Nest Callback Class for read stream + class ReadCallback: public InputStreamCallback { + public: +ReadCallback(uint64_t size, struct archive *arch, struct archive_entry *entry) : +buffer_size_(size), arch_(arch), entry_(entry) { +} +~ReadCallback() { +} +int64_t process(std::shared_ptr stream) { + uint8_t buffer[buffer_size_]; + int64_t ret = 0; + uint64_t read_size; + ret = stream->read(buffer, buffer_size_); + if (!stream) +read_size = stream->getSize(); + else +read_size = buffer_size_; + ret = archive_write_header(arch_, entry_); + ret += archive_write_data(arch_, buffer, read_size); + return ret; +} +uint64_t buffer_size_; +struct archive *arch_; +struct archive_entry *entry_; + }; + // Nest Callback Class for write stream + class WriteCallback: public OutputStreamCallback { + public: +WriteCallback(std::string merge_type, std::deque<std::shared_ptr> , core::ProcessSession *session) : +merge_type_(merge_type), flows_(flows), session_(session) { + size_ = 0; + stream_ = nullptr; +} +~WriteCallback() { +} + +std::string merge_type_; +std::deque<std::shared_ptr> _; +core::ProcessSession *session_; +std::shared_ptr stream_; +int64_t size_; + +static la_ssize_t archive_write(struct archive *arch, void *context, const void *buff, size_t size) { + WriteCallback *callback = (WriteCallback *) context; + la_ssize_t ret = callback->stream_->write(reinterpret_cast<uint8_t*>(const_cast<void*>(buff)), size); + if (ret > 0) +callback->size_ += (int64_t) ret; + return ret; +} + +int64_t process(std::shared_ptr stream) { + int64_t ret = 0; + struct archive *arch; + + arch = archive_write_new(); + if (merge_type_ == MERGE_FORMAT_TAR_VALUE) { +archive_write_set_format_pax_restricted(arch); // tar format + } + if (merge_type_ == MERGE_FORMAT_ZIP_VALUE) { +archive_write_set_format_zip(arch); // zip format + } + archive_write_set_bytes_per_block(arch, 0); + archive_write_add_filter_none(arch); + this->stream_ = stream; + archive_write_open(arch, this, NULL, archive_write, NULL); + + for (auto flow : flows_) { +struct archive_entry *entry = archive_entry_new(); +std::string fileName; +flow->getAttribute(FlowAttributeKey(FILENAME), fileName); +archive_entry_set_pathname(entry, fileName.c_str()); +archive_entry_set_size(entry, flow->getSize()); +archive_entry_set_mode(entry, S_IFREG | 0755); +if (merge_type_ == MERGE_FORMAT_TAR_VALUE) { + std::string perm; + int permInt; + if (flow->getAttribute(BinFiles::TAR_PERMISSIONS_ATTRIBUTE, perm)) { +try { + permInt = std::stoi(perm); + archive_entry_set_perm(entry, (mode_t) permInt); +} catch (...) { --- End diff -- @apiri @phrocker fixed. ---
[GitHub] nifi-minifi-cpp issue #146: MINIFICPP-72: Add Tar and Zip Support for MergeC...
Github user minifirocks commented on the issue: https://github.com/apache/nifi-minifi-cpp/pull/146 @apiri remove test and doc from lib archieve, update LICENSE to add each author. ---
[GitHub] nifi-minifi-cpp issue #146: Archive merge
Github user minifirocks commented on the issue: https://github.com/apache/nifi-minifi-cpp/pull/146 The incremental read already been added to address Marc comments. ---
[GitHub] nifi-minifi-cpp issue #146: Archive merge
Github user minifirocks commented on the issue: https://github.com/apache/nifi-minifi-cpp/pull/146 @phrocker @apiri please let me know if you have comments. ---
[GitHub] nifi-minifi-cpp pull request #146: Archive merge
Github user minifirocks commented on a diff in the pull request: https://github.com/apache/nifi-minifi-cpp/pull/146#discussion_r144026084 --- Diff: extensions/http-curl/CMakeLists.txt --- @@ -24,7 +24,7 @@ find_package(CURL REQUIRED) set(CMAKE_EXE_LINKER_FLAGS "-Wl,--export-all-symbols") set(CMAKE_SHARED_LINKER_FLAGS "-Wl,--export-symbols") -include_directories(../../libminifi/include ../../libminifi/include/c2 ../../libminifi/include/c2/protocols/ ../../libminifi/include/core/state ./libminifi/include/core/statemanagement/metrics ../../libminifi/include/core/yaml ../../libminifi/include/core ../../thirdparty/spdlog-20170710/include ../../thirdparty/concurrentqueue ../../thirdparty/yaml-cpp-yaml-cpp-0.5.3/include ../../thirdparty/civetweb-1.9.1/include ../../thirdparty/jsoncpp/include ../../thirdparty/leveldb-1.18/include ../../thirdparty/) +include_directories(../../libminifi/include ../../libminifi/include/c2 ../../libminifi/include/c2/protocols/ ../../libminifi/include/core/state ./libminifi/include/core/statemanagement/metrics ../../libminifi/include/core/yaml ../../libminifi/include/core ../../thirdparty/spdlog-20170710/include ../../thirdparty/concurrentqueue ../../thirdparty/yaml-cpp-yaml-cpp-0.5.3/include ../../thirdparty/civetweb-1.9.1/include ../../thirdparty/jsoncpp/include ../../thirdparty/leveldb-1.18/include ../../thirdparty/libarchive-3.3.2/libarchive ../../thirdparty/) --- End diff -- it is FlowConfiguration.h which include MergeContent.h ---
[GitHub] nifi-minifi-cpp pull request #146: Archive merge
Github user minifirocks commented on a diff in the pull request: https://github.com/apache/nifi-minifi-cpp/pull/146#discussion_r143912126 --- Diff: libminifi/src/processors/MergeContent.cpp --- @@ -276,6 +287,46 @@ std::shared_ptr BinaryConcatenationMerge::merge(core::ProcessCon return flowFile; } +std::shared_ptr TarMerge::merge(core::ProcessContext *context, core::ProcessSession *session, std::deque<std::shared_ptr> , std::string , +std::string , std::string ) { + std::shared_ptr flowFile = std::static_pointer_cast < FlowFileRecord > (session->create()); + ArchiveMerge::WriteCallback callback(std::string(MERGE_FORMAT_TAR_VALUE), flows, session); + session->write(flowFile, ); + session->putAttribute(flowFile, FlowAttributeKey(MIME_TYPE), this->getMergedContentType()); + std::string fileName; + flowFile->getAttribute(FlowAttributeKey(FILENAME), fileName); + if (flows.size() == 1) { +flows.front()->getAttribute(FlowAttributeKey(FILENAME), fileName); + } else { +flows.front()->getAttribute(BinFiles::SEGMENT_ORIGINAL_FILENAME, fileName); + } + if (!fileName.empty()) { +fileName += ".tar"; --- End diff -- same as blew zip. we will tar the tar file into a tar ball. ---
[GitHub] nifi-minifi-cpp pull request #146: Archive merge
Github user minifirocks commented on a diff in the pull request: https://github.com/apache/nifi-minifi-cpp/pull/146#discussion_r143912068 --- Diff: extensions/http-curl/CMakeLists.txt --- @@ -24,7 +24,7 @@ find_package(CURL REQUIRED) set(CMAKE_EXE_LINKER_FLAGS "-Wl,--export-all-symbols") set(CMAKE_SHARED_LINKER_FLAGS "-Wl,--export-symbols") -include_directories(../../libminifi/include ../../libminifi/include/c2 ../../libminifi/include/c2/protocols/ ../../libminifi/include/core/state ./libminifi/include/core/statemanagement/metrics ../../libminifi/include/core/yaml ../../libminifi/include/core ../../thirdparty/spdlog-20170710/include ../../thirdparty/concurrentqueue ../../thirdparty/yaml-cpp-yaml-cpp-0.5.3/include ../../thirdparty/civetweb-1.9.1/include ../../thirdparty/jsoncpp/include ../../thirdparty/leveldb-1.18/include ../../thirdparty/) +include_directories(../../libminifi/include ../../libminifi/include/c2 ../../libminifi/include/c2/protocols/ ../../libminifi/include/core/state ./libminifi/include/core/statemanagement/metrics ../../libminifi/include/core/yaml ../../libminifi/include/core ../../thirdparty/spdlog-20170710/include ../../thirdparty/concurrentqueue ../../thirdparty/yaml-cpp-yaml-cpp-0.5.3/include ../../thirdparty/civetweb-1.9.1/include ../../thirdparty/jsoncpp/include ../../thirdparty/leveldb-1.18/include ../../thirdparty/libarchive-3.3.2/libarchive ../../thirdparty/) --- End diff -- Scanning dependencies of target minifi-http-curl [ 15%] Building CXX object extensions/http-curl/CMakeFiles/minifi-http-curl.dir/HttpCurlLoader.cpp.o In file included from /Users/binqiu/m/nifi-minifi-cpp/extensions/http-curl/HttpCurlLoader.cpp:20: In file included from /Users/binqiu/m/nifi-minifi-cpp/extensions/http-curl/../../libminifi/include/core/FlowConfiguration.h:37: /Users/binqiu/m/nifi-minifi-cpp/extensions/http-curl/../../libminifi/include/processors/MergeContent.h:24:10: fatal error: 'archive_entry.h' file not found ---
[GitHub] nifi-minifi-cpp issue #146: Archive merge
Github user minifirocks commented on the issue: https://github.com/apache/nifi-minifi-cpp/pull/146 @phrocker please review the same @apiri @jdye64 please review the LICENSE file for libarchive to see whether it is OK. Joe raise some concern in last PR. ---
[GitHub] nifi-minifi-cpp pull request #146: Archive merge
Github user minifirocks commented on a diff in the pull request: https://github.com/apache/nifi-minifi-cpp/pull/146#discussion_r143552652 --- Diff: libminifi/src/processors/MergeContent.cpp --- @@ -276,6 +287,46 @@ std::shared_ptr BinaryConcatenationMerge::merge(core::ProcessCon return flowFile; } +std::shared_ptr TarMerge::merge(core::ProcessContext *context, core::ProcessSession *session, std::deque<std::shared_ptr> , std::string , +std::string , std::string ) { + std::shared_ptr flowFile = std::static_pointer_cast < FlowFileRecord > (session->create()); + ArchiveMerge::WriteCallback callback(std::string(MERGE_FORMAT_TAR_VALUE), flows, session); + session->write(flowFile, ); + session->putAttribute(flowFile, FlowAttributeKey(MIME_TYPE), this->getMergedContentType()); + std::string fileName; + flowFile->getAttribute(FlowAttributeKey(FILENAME), fileName); + if (flows.size() == 1) { +flows.front()->getAttribute(FlowAttributeKey(FILENAME), fileName); + } else { +flows.front()->getAttribute(BinFiles::SEGMENT_ORIGINAL_FILENAME, fileName); + } + if (!fileName.empty()) { +fileName += ".tar"; +session->putAttribute(flowFile, FlowAttributeKey(FILENAME), fileName); + } + return flowFile; +} + +std::shared_ptr ZipMerge::merge(core::ProcessContext *context, core::ProcessSession *session, std::deque<std::shared_ptr> , std::string , +std::string , std::string ) { + std::shared_ptr flowFile = std::static_pointer_cast < FlowFileRecord > (session->create()); + ArchiveMerge::WriteCallback callback(std::string(MERGE_FORMAT_ZIP_VALUE), flows, session); + session->write(flowFile, ); + session->putAttribute(flowFile, FlowAttributeKey(MIME_TYPE), this->getMergedContentType()); + std::string fileName; + flowFile->getAttribute(FlowAttributeKey(FILENAME), fileName); + if (flows.size() == 1) { +flows.front()->getAttribute(FlowAttributeKey(FILENAME), fileName); + } else { +flows.front()->getAttribute(BinFiles::SEGMENT_ORIGINAL_FILENAME, fileName); + } + if (!fileName.empty()) { +fileName += ".zip"; --- End diff -- if two sep zip files fail into the same bin, we will create a single zip file which zip these two zip files. ---
[GitHub] nifi-minifi-cpp pull request #146: Archive merge
Github user minifirocks commented on a diff in the pull request: https://github.com/apache/nifi-minifi-cpp/pull/146#discussion_r143551961 --- Diff: libminifi/src/processors/MergeContent.cpp --- @@ -276,6 +287,46 @@ std::shared_ptr BinaryConcatenationMerge::merge(core::ProcessCon return flowFile; } +std::shared_ptr TarMerge::merge(core::ProcessContext *context, core::ProcessSession *session, std::deque<std::shared_ptr> , std::string , +std::string , std::string ) { + std::shared_ptr flowFile = std::static_pointer_cast < FlowFileRecord > (session->create()); + ArchiveMerge::WriteCallback callback(std::string(MERGE_FORMAT_TAR_VALUE), flows, session); + session->write(flowFile, ); + session->putAttribute(flowFile, FlowAttributeKey(MIME_TYPE), this->getMergedContentType()); + std::string fileName; + flowFile->getAttribute(FlowAttributeKey(FILENAME), fileName); + if (flows.size() == 1) { +flows.front()->getAttribute(FlowAttributeKey(FILENAME), fileName); + } else { +flows.front()->getAttribute(BinFiles::SEGMENT_ORIGINAL_FILENAME, fileName); + } + if (!fileName.empty()) { +fileName += ".tar"; +session->putAttribute(flowFile, FlowAttributeKey(FILENAME), fileName); + } + return flowFile; +} + +std::shared_ptr ZipMerge::merge(core::ProcessContext *context, core::ProcessSession *session, std::deque<std::shared_ptr> , std::string , +std::string , std::string ) { + std::shared_ptr flowFile = std::static_pointer_cast < FlowFileRecord > (session->create()); + ArchiveMerge::WriteCallback callback(std::string(MERGE_FORMAT_ZIP_VALUE), flows, session); + session->write(flowFile, ); + session->putAttribute(flowFile, FlowAttributeKey(MIME_TYPE), this->getMergedContentType()); + std::string fileName; + flowFile->getAttribute(FlowAttributeKey(FILENAME), fileName); + if (flows.size() == 1) { +flows.front()->getAttribute(FlowAttributeKey(FILENAME), fileName); + } else { +flows.front()->getAttribute(BinFiles::SEGMENT_ORIGINAL_FILENAME, fileName); + } + if (!fileName.empty()) { +fileName += ".zip"; --- End diff -- yes, we will zip the two separate zip into a larger one ---
[GitHub] nifi-minifi-cpp pull request #146: Archive merge
Github user minifirocks commented on a diff in the pull request: https://github.com/apache/nifi-minifi-cpp/pull/146#discussion_r143551465 --- Diff: libminifi/include/processors/MergeContent.h --- @@ -125,6 +127,127 @@ class BinaryConcatenationMerge : public MergeBin { }; +// Archive Class +class ArchiveMerge { +public: + // Nest Callback Class for read stream + class ReadCallback: public InputStreamCallback { + public: +ReadCallback(uint64_t size, struct archive *arch, struct archive_entry *entry) : +buffer_size_(size), arch_(arch), entry_(entry) { +} +~ReadCallback() { +} +int64_t process(std::shared_ptr stream) { + uint8_t buffer[buffer_size_]; + int64_t ret = 0; + uint64_t read_size; + ret = stream->read(buffer, buffer_size_); + if (!stream) +read_size = stream->getSize(); + else +read_size = buffer_size_; + ret = archive_write_header(arch_, entry_); + ret += archive_write_data(arch_, buffer, read_size); + return ret; +} +uint64_t buffer_size_; +struct archive *arch_; +struct archive_entry *entry_; + }; + // Nest Callback Class for write stream + class WriteCallback: public OutputStreamCallback { + public: +WriteCallback(std::string merge_type, std::deque<std::shared_ptr> , core::ProcessSession *session) : +merge_type_(merge_type), flows_(flows), session_(session) { + size_ = 0; + stream_ = nullptr; +} +~WriteCallback() { +} + +std::string merge_type_; +std::deque<std::shared_ptr> _; +core::ProcessSession *session_; +std::shared_ptr stream_; +int64_t size_; + +static la_ssize_t archive_write(struct archive *arch, void *context, const void *buff, size_t size) { + WriteCallback *callback = (WriteCallback *) context; + la_ssize_t ret = callback->stream_->write(reinterpret_cast<uint8_t*>(const_cast<void*>(buff)), size); + if (ret > 0) +callback->size_ += (int64_t) ret; + return ret; +} + +int64_t process(std::shared_ptr stream) { + int64_t ret = 0; + struct archive *arch; + + arch = archive_write_new(); + if (merge_type_ == MERGE_FORMAT_TAR_VALUE) { +archive_write_set_format_pax_restricted(arch); // tar format + } + if (merge_type_ == MERGE_FORMAT_ZIP_VALUE) { +archive_write_set_format_zip(arch); // zip format + } + archive_write_set_bytes_per_block(arch, 0); + archive_write_add_filter_none(arch); + this->stream_ = stream; + archive_write_open(arch, this, NULL, archive_write, NULL); + + for (auto flow : flows_) { +struct archive_entry *entry = archive_entry_new(); +std::string fileName; +flow->getAttribute(FlowAttributeKey(FILENAME), fileName); +archive_entry_set_pathname(entry, fileName.c_str()); +archive_entry_set_size(entry, flow->getSize()); +archive_entry_set_mode(entry, S_IFREG | 0755); +if (merge_type_ == MERGE_FORMAT_TAR_VALUE) { + std::string perm; + int permInt; + if (flow->getAttribute(BinFiles::TAR_PERMISSIONS_ATTRIBUTE, perm)) { +try { + permInt = std::stoi(perm); + archive_entry_set_perm(entry, (mode_t) permInt); +} catch (...) { +} + } +} +ReadCallback readCb(flow->getSize(), arch, entry); --- End diff -- will try to do incremental read. ---
[GitHub] nifi-minifi-cpp pull request #146: Archive merge
Github user minifirocks commented on a diff in the pull request: https://github.com/apache/nifi-minifi-cpp/pull/146#discussion_r143551088 --- Diff: libminifi/include/processors/MergeContent.h --- @@ -125,6 +127,127 @@ class BinaryConcatenationMerge : public MergeBin { }; +// Archive Class +class ArchiveMerge { +public: + // Nest Callback Class for read stream + class ReadCallback: public InputStreamCallback { + public: +ReadCallback(uint64_t size, struct archive *arch, struct archive_entry *entry) : +buffer_size_(size), arch_(arch), entry_(entry) { +} +~ReadCallback() { +} +int64_t process(std::shared_ptr stream) { + uint8_t buffer[buffer_size_]; + int64_t ret = 0; + uint64_t read_size; + ret = stream->read(buffer, buffer_size_); + if (!stream) +read_size = stream->getSize(); + else +read_size = buffer_size_; + ret = archive_write_header(arch_, entry_); + ret += archive_write_data(arch_, buffer, read_size); + return ret; +} +uint64_t buffer_size_; +struct archive *arch_; +struct archive_entry *entry_; + }; + // Nest Callback Class for write stream + class WriteCallback: public OutputStreamCallback { + public: +WriteCallback(std::string merge_type, std::deque<std::shared_ptr> , core::ProcessSession *session) : +merge_type_(merge_type), flows_(flows), session_(session) { + size_ = 0; + stream_ = nullptr; +} +~WriteCallback() { +} + +std::string merge_type_; +std::deque<std::shared_ptr> _; +core::ProcessSession *session_; +std::shared_ptr stream_; +int64_t size_; + +static la_ssize_t archive_write(struct archive *arch, void *context, const void *buff, size_t size) { + WriteCallback *callback = (WriteCallback *) context; + la_ssize_t ret = callback->stream_->write(reinterpret_cast<uint8_t*>(const_cast<void*>(buff)), size); + if (ret > 0) +callback->size_ += (int64_t) ret; + return ret; +} + +int64_t process(std::shared_ptr stream) { + int64_t ret = 0; + struct archive *arch; + + arch = archive_write_new(); + if (merge_type_ == MERGE_FORMAT_TAR_VALUE) { +archive_write_set_format_pax_restricted(arch); // tar format + } + if (merge_type_ == MERGE_FORMAT_ZIP_VALUE) { +archive_write_set_format_zip(arch); // zip format + } + archive_write_set_bytes_per_block(arch, 0); + archive_write_add_filter_none(arch); + this->stream_ = stream; + archive_write_open(arch, this, NULL, archive_write, NULL); + + for (auto flow : flows_) { +struct archive_entry *entry = archive_entry_new(); +std::string fileName; +flow->getAttribute(FlowAttributeKey(FILENAME), fileName); +archive_entry_set_pathname(entry, fileName.c_str()); +archive_entry_set_size(entry, flow->getSize()); +archive_entry_set_mode(entry, S_IFREG | 0755); +if (merge_type_ == MERGE_FORMAT_TAR_VALUE) { + std::string perm; + int permInt; + if (flow->getAttribute(BinFiles::TAR_PERMISSIONS_ATTRIBUTE, perm)) { +try { + permInt = std::stoi(perm); + archive_entry_set_perm(entry, (mode_t) permInt); +} catch (...) { --- End diff -- will do. ---
[GitHub] nifi-minifi-cpp pull request #146: Archive merge
Github user minifirocks commented on a diff in the pull request: https://github.com/apache/nifi-minifi-cpp/pull/146#discussion_r143550805 --- Diff: libminifi/include/processors/MergeContent.h --- @@ -125,6 +127,127 @@ class BinaryConcatenationMerge : public MergeBin { }; +// Archive Class +class ArchiveMerge { +public: + // Nest Callback Class for read stream + class ReadCallback: public InputStreamCallback { + public: +ReadCallback(uint64_t size, struct archive *arch, struct archive_entry *entry) : +buffer_size_(size), arch_(arch), entry_(entry) { +} +~ReadCallback() { +} +int64_t process(std::shared_ptr stream) { + uint8_t buffer[buffer_size_]; + int64_t ret = 0; + uint64_t read_size; + ret = stream->read(buffer, buffer_size_); + if (!stream) --- End diff -- !stream means that we read EOF after the operation, in this case, we did not get the full size. ---