[GitHub] nifi-minifi-cpp issue #306: MINIFICPP-457: Network Management Controller Ser...

2018-05-01 Thread minifirocks
Github user minifirocks commented on the issue:

https://github.com/apache/nifi-minifi-cpp/pull/306
  
@achristianson please review


---


[GitHub] nifi-minifi-cpp issue #301: MINIFICPP-459 Include FLexLexer.h in thirdparty ...

2018-05-01 Thread minifirocks
Github user minifirocks commented on the issue:

https://github.com/apache/nifi-minifi-cpp/pull/301
  
looks good


---


[GitHub] nifi-minifi-cpp pull request #313: MINIFICPP-403: Add version into flow attr...

2018-05-01 Thread minifirocks
Github user minifirocks commented on a diff in the pull request:

https://github.com/apache/nifi-minifi-cpp/pull/313#discussion_r185257853
  
--- Diff: libminifi/test/resources/TestHTTPGet.yml ---
@@ -19,6 +19,7 @@
 Flow Controller:
 name: MiNiFi Flow
 id: 2438e3c8-015a-1000-79ca-83af40ec1990
+version: 1
--- End diff --

@phrocker that's flow config yml version, i thought it refer to the the 
flow version. when you update the YML file, flow version change also


---


[GitHub] nifi-minifi-cpp pull request #313: MINIFICPP-403: Add version into flow attr...

2018-04-26 Thread minifirocks
GitHub user minifirocks opened a pull request:

https://github.com/apache/nifi-minifi-cpp/pull/313

MINIFICPP-403: Add version into flow attributes

Thank you for submitting a contribution to Apache NiFi - MiNiFi C++.

In order to streamline the review of the contribution we ask you
to ensure the following steps have been taken:

### For all changes:
- [ ] Is there a JIRA ticket associated with this PR? Is it referenced
 in the commit message?

- [ ] Does your PR title start with MINIFI- where  is the JIRA 
number you are trying to resolve? Pay particular attention to the hyphen "-" 
character.

- [ ] Has your PR been rebased against the latest commit within the target 
branch (typically master)?

- [ ] Is your initial contribution a single, squashed commit?

### For code changes:
- [ ] If adding new dependencies to the code, are these dependencies 
licensed in a way that is compatible for inclusion under [ASF 
2.0](http://www.apache.org/legal/resolved.html#category-a)?
- [ ] If applicable, have you updated the LICENSE file?
- [ ] If applicable, have you updated the NOTICE file?

### For documentation related changes:
- [ ] Have you ensured that format looks appropriate for the output in 
which it is rendered?

### Note:
Please ensure that once the PR is submitted, you check travis-ci for build 
issues and submit an update to your PR as soon as possible.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/minifirocks/nifi-minifi-cpp 
add_version_to_flow

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/nifi-minifi-cpp/pull/313.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #313






---


[GitHub] nifi-minifi-cpp issue #306: MINIFICPP-457: Network Management Controller Ser...

2018-04-25 Thread minifirocks
Github user minifirocks commented on the issue:

https://github.com/apache/nifi-minifi-cpp/pull/306
  
@achristianson add test case


---


[GitHub] nifi-minifi-cpp pull request #306: MINIFICPP-457: Network Management Control...

2018-04-23 Thread minifirocks
GitHub user minifirocks opened a pull request:

https://github.com/apache/nifi-minifi-cpp/pull/306

MINIFICPP-457: Network Management Controller Service for interface bi…

…nding

Thank you for submitting a contribution to Apache NiFi - MiNiFi C++.

In order to streamline the review of the contribution we ask you
to ensure the following steps have been taken:

### For all changes:
- [ ] Is there a JIRA ticket associated with this PR? Is it referenced
 in the commit message?

- [ ] Does your PR title start with MINIFI- where  is the JIRA 
number you are trying to resolve? Pay particular attention to the hyphen "-" 
character.

- [ ] Has your PR been rebased against the latest commit within the target 
branch (typically master)?

- [ ] Is your initial contribution a single, squashed commit?

### For code changes:
- [ ] If adding new dependencies to the code, are these dependencies 
licensed in a way that is compatible for inclusion under [ASF 
2.0](http://www.apache.org/legal/resolved.html#category-a)?
- [ ] If applicable, have you updated the LICENSE file?
- [ ] If applicable, have you updated the NOTICE file?

### For documentation related changes:
- [ ] Have you ensured that format looks appropriate for the output in 
which it is rendered?

### Note:
Please ensure that once the PR is submitted, you check travis-ci for build 
issues and submit an update to your PR as soon as possible.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/minifirocks/nifi-minifi-cpp 
network_controller_interface_bind

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/nifi-minifi-cpp/pull/306.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #306


commit ec65487e2ab6936c49c6822b9a3966e66e756379
Author: Bin Qiu <benqiu2016@...>
Date:   2018-04-23T15:39:30Z

MINIFICPP-457: Network Management Controller Service for interface binding




---


[GitHub] nifi-minifi-cpp issue #295: MINFICPP-403: Flow Meta tagging

2018-04-18 Thread minifirocks
Github user minifirocks commented on the issue:

https://github.com/apache/nifi-minifi-cpp/pull/295
  
@phrocker so if that's the case, we can remove the below meta info when the 
meta info container was constructed and just provide the framework, if user 
want to use that, they can create their own meta info and add this to the meta 
info container. **we provide a framework to let them add their own meta info 
into the flow file for mutable meta info.** 

 : config_(configure) {
 +// add version, serial number as default meta info
 +std::unique_ptr version = std::unique_ptr < 
core::MetaInfo > (new VersionMetaInfo());
 +addMetaInfo(std::move(version));
 +std::string serial_number;
 +config_->get("device.id", serial_number);
 +state::metrics::Device device;
 +if (serial_number.empty()) {
 +  // we did not config serial number, use the mac address
 +  serial_number = device.device_id_;
 +}
 +std::unique_ptr serial_number_meta_info = 
std::unique_ptr < core::MetaInfo >(new MetaInfo("device.id", serial_number));
 +addMetaInfo(std::move(serial_number_meta_info));
 +std::unique_ptr hostname_meta_info = std::unique_ptr 
< core::MetaInfo >(new MetaInfo("hostname", device.canonical_hostname_));
 +addMetaInfo(std::move(hostname_meta_info));


---


[GitHub] nifi-minifi-cpp issue #297: MINIFICPP-446 Add escape/unescape HTML3 EL funct...

2018-04-16 Thread minifirocks
Github user minifirocks commented on the issue:

https://github.com/apache/nifi-minifi-cpp/pull/297
  
looks good


---


[GitHub] nifi-minifi-cpp issue #299: MINIFICPP-454: Fix apt-get install statement

2018-04-16 Thread minifirocks
Github user minifirocks commented on the issue:

https://github.com/apache/nifi-minifi-cpp/pull/299
  
@phrocker looks good, please merge to 1 commit


---


[GitHub] nifi-minifi-cpp issue #295: MINFICPP-403: Flow Meta tagging

2018-04-16 Thread minifirocks
Github user minifirocks commented on the issue:

https://github.com/apache/nifi-minifi-cpp/pull/295
  
@phrocker please let me know whether you have more comments. Thanks.


---


[GitHub] nifi-minifi-cpp issue #295: MINFICPP-403: Flow Meta tagging

2018-04-11 Thread minifirocks
Github user minifirocks commented on the issue:

https://github.com/apache/nifi-minifi-cpp/pull/295
  
@phrocker please review
change shared_ptr to unique ptr
use agent.version and flow.version for meta info



---


[GitHub] nifi-minifi-cpp issue #295: MINFICPP-403: Flow Meta tagging

2018-04-06 Thread minifirocks
Github user minifirocks commented on the issue:

https://github.com/apache/nifi-minifi-cpp/pull/295
  
@phrocker the reason that i do not want to use processor is that meta info 
below to flow which is generated by the any processor. we provide a framework 
let user to write their own meta info and add to the container. for example, 
they can add vendor specified meta info. as for the shared ptr, i can change to 
unique ptr. i pick share ptr because it make it more flexible in case it need 
to be used somewhere besides the container.


---


[GitHub] nifi-minifi-cpp pull request #295: MINFICPP-403: Flow Meta tagging

2018-04-06 Thread minifirocks
Github user minifirocks commented on a diff in the pull request:

https://github.com/apache/nifi-minifi-cpp/pull/295#discussion_r179779744
  
--- Diff: libminifi/include/core/MetaInfo.h ---
@@ -0,0 +1,180 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef __META_INFO_H__
+#define __META_INFO_H__
+
+#include 
+#include 
+#include 
+#include 
+#include 
+#include 
+#include 
+#include 
+#include 
+#include 
+#include "utils/StringUtils.h"
+#include "core/FlowFile.h"
+#include "core/state/metrics/DeviceInformation.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace core {
+
+// MetaInfo Class
+class MetaInfo {
+
+ public:
+  // Constructor
+  /*!
+   * Create a new meta info
+   */
+  explicit MetaInfo(const std::string , const std::string )
+  : name_(name),
+value_(value) {
+  }
+
+  // Destructor
+  virtual ~MetaInfo() {
+  }
+
+  // Get Name for the meta info
+  std::string getName() {
+return name_;
+  }
+  // Get value for the meta info, overwritten by specific child meta info
+  virtual std::string getValue(const std::shared_ptr 
) {
+return value_;
+  }
+
+ protected:
+  // Name
+  std::string name_;
+  // Value
+  std::string value_;
+
+ private:
+};
+
+#ifdef MINIFI_VERSION
--- End diff --

will do


---


[GitHub] nifi-minifi-cpp issue #295: MINFICPP-403: Flow Meta tagging

2018-04-05 Thread minifirocks
Github user minifirocks commented on the issue:

https://github.com/apache/nifi-minifi-cpp/pull/295
  
@phrocker Thanks for the review. the PR is not only for version, it provide 
a flexible framework to add other meta info also. also in the cmake file, we 
already specify major/minor/patch version, why we need another 
generateVersion.sh. 


---


[GitHub] nifi-minifi-cpp pull request #295: MINFICPP-403: Flow Meta tagging

2018-04-05 Thread minifirocks
GitHub user minifirocks opened a pull request:

https://github.com/apache/nifi-minifi-cpp/pull/295

MINFICPP-403: Flow Meta tagging

Thank you for submitting a contribution to Apache NiFi - MiNiFi C++.

In order to streamline the review of the contribution we ask you
to ensure the following steps have been taken:

### For all changes:
- [ ] Is there a JIRA ticket associated with this PR? Is it referenced
 in the commit message?

- [ ] Does your PR title start with MINIFI- where  is the JIRA 
number you are trying to resolve? Pay particular attention to the hyphen "-" 
character.

- [ ] Has your PR been rebased against the latest commit within the target 
branch (typically master)?

- [ ] Is your initial contribution a single, squashed commit?

### For code changes:
- [ ] If adding new dependencies to the code, are these dependencies 
licensed in a way that is compatible for inclusion under [ASF 
2.0](http://www.apache.org/legal/resolved.html#category-a)?
- [ ] If applicable, have you updated the LICENSE file?
- [ ] If applicable, have you updated the NOTICE file?

### For documentation related changes:
- [ ] Have you ensured that format looks appropriate for the output in 
which it is rendered?

### Note:
Please ensure that once the PR is submitted, you check travis-ci for build 
issues and submit an update to your PR as soon as possible.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/minifirocks/nifi-minifi-cpp meta_info

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/nifi-minifi-cpp/pull/295.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #295


commit 22f52fb9d225233ba63df04eb8c91ca9af94a4ba
Author: Bin Qiu <benqiu2016@...>
Date:   2018-04-03T15:57:23Z

MINFICPP-403: Flow Meta tagging




---


[GitHub] nifi-minifi-cpp pull request #280: MINIFICPP-404: http proxy support for s2s

2018-03-14 Thread minifirocks
GitHub user minifirocks opened a pull request:

https://github.com/apache/nifi-minifi-cpp/pull/280

MINIFICPP-404: http proxy support for s2s

Thank you for submitting a contribution to Apache NiFi - MiNiFi C++.

In order to streamline the review of the contribution we ask you
to ensure the following steps have been taken:

### For all changes:
- [ ] Is there a JIRA ticket associated with this PR? Is it referenced
 in the commit message?

- [ ] Does your PR title start with MINIFI- where  is the JIRA 
number you are trying to resolve? Pay particular attention to the hyphen "-" 
character.

- [ ] Has your PR been rebased against the latest commit within the target 
branch (typically master)?

- [ ] Is your initial contribution a single, squashed commit?

### For code changes:
- [ ] If adding new dependencies to the code, are these dependencies 
licensed in a way that is compatible for inclusion under [ASF 
2.0](http://www.apache.org/legal/resolved.html#category-a)?
- [ ] If applicable, have you updated the LICENSE file?
- [ ] If applicable, have you updated the NOTICE file?

### For documentation related changes:
- [ ] Have you ensured that format looks appropriate for the output in 
which it is rendered?

### Note:
Please ensure that once the PR is submitted, you check travis-ci for build 
issues and submit an update to your PR as soon as possible.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/minifirocks/nifi-minifi-cpp http_proxy

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/nifi-minifi-cpp/pull/280.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #280


commit e3ae4cdae5e1b474a4bab0b8cb5fe530f6a7f741
Author: Bin Qiu <benqiu2016@...>
Date:   2018-03-08T16:05:35Z

MINIFICPP-404: http proxy support for s2s




---


[GitHub] nifi-minifi-cpp pull request #268: MINIFICPP-397 Added implementation of Rou...

2018-02-28 Thread minifirocks
Github user minifirocks commented on a diff in the pull request:

https://github.com/apache/nifi-minifi-cpp/pull/268#discussion_r171389409
  
--- Diff: libminifi/src/processors/RouteOnAttribute.cpp ---
@@ -0,0 +1,107 @@
+/**
+ * @file RouteOnAttribute.cpp
+ * RouteOnAttribute class implementation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "processors/RouteOnAttribute.h"
+
+#include 
+#include 
+#include 
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+
+core::Relationship RouteOnAttribute::Unmatched(
+"unmatched",
+"Files which do not match any expression are routed here");
+core::Relationship RouteOnAttribute::Failure(
+"failure",
+"Failed files are transferred to failure");
+
+void RouteOnAttribute::initialize() {
+  std::set properties;
+  setSupportedProperties(properties);
+}
+
+void RouteOnAttribute::onDynamicPropertyModified(const core::Property 
_property,
+ const core::Property 
_property) {
+
+  // Update the routing table when routes are added via dynamic properties.
+  route_properties_[new_property.getName()] = new_property;
+
+  std::set relationships;
+
+  for (const auto  : route_properties_) {
+core::Relationship route_rel{route.first, "Dynamic route"};
+route_rels_[route.first] = route_rel;
+relationships.insert(route_rel);
+logger_->log_info("RouteOnAttribute registered route '%s' with 
expression '%s'",
+  route.first,
+  route.second.getValue());
+  }
+
+  relationships.insert(Unmatched);
+  relationships.insert(Failure);
+  setSupportedRelationships(relationships);
--- End diff --

OK.


---


[GitHub] nifi-minifi-cpp pull request #268: MINIFICPP-397 Added implementation of Rou...

2018-02-28 Thread minifirocks
Github user minifirocks commented on a diff in the pull request:

https://github.com/apache/nifi-minifi-cpp/pull/268#discussion_r171293683
  
--- Diff: libminifi/src/processors/RouteOnAttribute.cpp ---
@@ -0,0 +1,107 @@
+/**
+ * @file RouteOnAttribute.cpp
+ * RouteOnAttribute class implementation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "processors/RouteOnAttribute.h"
+
+#include 
+#include 
+#include 
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+
+core::Relationship RouteOnAttribute::Unmatched(
+"unmatched",
+"Files which do not match any expression are routed here");
+core::Relationship RouteOnAttribute::Failure(
+"failure",
+"Failed files are transferred to failure");
+
+void RouteOnAttribute::initialize() {
+  std::set properties;
+  setSupportedProperties(properties);
+}
+
+void RouteOnAttribute::onDynamicPropertyModified(const core::Property 
_property,
+ const core::Property 
_property) {
+
+  // Update the routing table when routes are added via dynamic properties.
+  route_properties_[new_property.getName()] = new_property;
+
+  std::set relationships;
+
+  for (const auto  : route_properties_) {
+core::Relationship route_rel{route.first, "Dynamic route"};
+route_rels_[route.first] = route_rel;
+relationships.insert(route_rel);
+logger_->log_info("RouteOnAttribute registered route '%s' with 
expression '%s'",
+  route.first,
+  route.second.getValue());
+  }
+
+  relationships.insert(Unmatched);
+  relationships.insert(Failure);
+  setSupportedRelationships(relationships);
--- End diff --

bool Connectable::setSupportedRelationships(std::set 
relationships) {
  if (isRunning()) {
logger_->log_warn("Can not set processor supported relationship while 
the process %s is running", name_);
return false;
  }
what if we do the onDynamicPropertyModified while the processor is running


---


[GitHub] nifi-minifi-cpp pull request #272: MINIFICPP-405: RPG bind to local interfac...

2018-02-26 Thread minifirocks
GitHub user minifirocks opened a pull request:

https://github.com/apache/nifi-minifi-cpp/pull/272

MINIFICPP-405: RPG bind to local interface

Thank you for submitting a contribution to Apache NiFi - MiNiFi C++.

In order to streamline the review of the contribution we ask you
to ensure the following steps have been taken:

### For all changes:
- [ ] Is there a JIRA ticket associated with this PR? Is it referenced
 in the commit message?

- [ ] Does your PR title start with MINIFI- where  is the JIRA 
number you are trying to resolve? Pay particular attention to the hyphen "-" 
character.

- [ ] Has your PR been rebased against the latest commit within the target 
branch (typically master)?

- [ ] Is your initial contribution a single, squashed commit?

### For code changes:
- [ ] If adding new dependencies to the code, are these dependencies 
licensed in a way that is compatible for inclusion under [ASF 
2.0](http://www.apache.org/legal/resolved.html#category-a)?
- [ ] If applicable, have you updated the LICENSE file?
- [ ] If applicable, have you updated the NOTICE file?

### For documentation related changes:
- [ ] Have you ensured that format looks appropriate for the output in 
which it is rendered?

### Note:
Please ensure that once the PR is submitted, you check travis-ci for build 
issues and submit an update to your PR as soon as possible.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/minifirocks/nifi-minifi-cpp 
local_network_interface

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/nifi-minifi-cpp/pull/272.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #272


commit 1313bedffd957444dbfd807f26a3d7ac110fdc89
Author: Bin Qiu <benqiu2016@...>
Date:   2018-02-26T15:21:24Z

MINIFICPP-405: RPG bind to local interface




---


[GitHub] nifi-minifi-cpp issue #260: MINIFICPP-382: Implement SUSE release support fo...

2018-02-14 Thread minifirocks
Github user minifirocks commented on the issue:

https://github.com/apache/nifi-minifi-cpp/pull/260
  
Looks good.


---


[GitHub] nifi-minifi-cpp pull request #259: MINIFICPP-393: Add security support for M...

2018-02-06 Thread minifirocks
Github user minifirocks commented on a diff in the pull request:

https://github.com/apache/nifi-minifi-cpp/pull/259#discussion_r16631
  
--- Diff: extensions/mqtt/ConsumeMQTT.cpp ---
@@ -35,7 +35,7 @@ namespace nifi {
 namespace minifi {
 namespace processors {
 
-core::Property ConsumeMQTT::MaxQueueSize("Max Flow Segment Size", "Maximum 
flow content payload segment size for the MQTT record", "");
+core::Property ConsumeMQTT::MaxQueueSize("Max Queue Size", "Maximum 
receive queue size for the MQTT record", "");
--- End diff --

Fixed in latest commit


---


[GitHub] nifi-minifi-cpp pull request #259: MINIFICPP-393: Add security support for M...

2018-02-05 Thread minifirocks
Github user minifirocks commented on a diff in the pull request:

https://github.com/apache/nifi-minifi-cpp/pull/259#discussion_r166130342
  
--- Diff: extensions/mqtt/ConsumeMQTT.cpp ---
@@ -35,7 +35,7 @@ namespace nifi {
 namespace minifi {
 namespace processors {
 
-core::Property ConsumeMQTT::MaxQueueSize("Max Flow Segment Size", "Maximum 
flow content payload segment size for the MQTT record", "");
+core::Property ConsumeMQTT::MaxQueueSize("Max Queue Size", "Maximum 
receive queue size for the MQTT record", "");
--- End diff --

The previous version of the parameter configuration is wrong so that is no 
backward compatible. 


---


[GitHub] nifi-minifi-cpp pull request #259: MINIFICPP-393: Add security support for M...

2018-02-05 Thread minifirocks
Github user minifirocks commented on a diff in the pull request:

https://github.com/apache/nifi-minifi-cpp/pull/259#discussion_r166072113
  
--- Diff: extensions/mqtt/AbstractMQTTProcessor.cpp ---
@@ -119,6 +126,38 @@ void 
AbstractMQTTProcessor::onSchedule(core::ProcessContext *context, core::Proc
 qos_ = valInt;
 logger_->log_debug("AbstractMQTTProcessor: QOS [%ll]", qos_);
   }
+  value = "";
+
+  if (context->getProperty(SecurityProtocol.getName(), value) && 
!value.empty()) {
+if (value == MQTT_SECURITY_PROTOCOL_SSL) {
+  sslEnabled_ = true;
--- End diff --

the SSL handshake will fail and based on the error, user need to config the 
right certs.


---


[GitHub] nifi-minifi-cpp pull request #259: MINIFICPP-393: Add security support for M...

2018-02-05 Thread minifirocks
Github user minifirocks commented on a diff in the pull request:

https://github.com/apache/nifi-minifi-cpp/pull/259#discussion_r166071814
  
--- Diff: extensions/mqtt/ConsumeMQTT.cpp ---
@@ -35,7 +35,7 @@ namespace nifi {
 namespace minifi {
 namespace processors {
 
-core::Property ConsumeMQTT::MaxQueueSize("Max Flow Segment Size", "Maximum 
flow content payload segment size for the MQTT record", "");
+core::Property ConsumeMQTT::MaxQueueSize("Max Queue Size", "Maximum 
receive queue size for the MQTT record", "");
--- End diff --

we need to correct that because the variable is not right in the first 
place. also if user was using that option, it will default to the default value.


---


[GitHub] nifi-minifi-cpp pull request #259: MINIFICPP-393: Add security support for M...

2018-02-05 Thread minifirocks
GitHub user minifirocks opened a pull request:

https://github.com/apache/nifi-minifi-cpp/pull/259

MINIFICPP-393: Add security support for MQTT

Thank you for submitting a contribution to Apache NiFi - MiNiFi C++.

In order to streamline the review of the contribution we ask you
to ensure the following steps have been taken:

### For all changes:
- [ ] Is there a JIRA ticket associated with this PR? Is it referenced
 in the commit message?

- [ ] Does your PR title start with MINIFI- where  is the JIRA 
number you are trying to resolve? Pay particular attention to the hyphen "-" 
character.

- [ ] Has your PR been rebased against the latest commit within the target 
branch (typically master)?

- [ ] Is your initial contribution a single, squashed commit?

### For code changes:
- [ ] If adding new dependencies to the code, are these dependencies 
licensed in a way that is compatible for inclusion under [ASF 
2.0](http://www.apache.org/legal/resolved.html#category-a)?
- [ ] If applicable, have you updated the LICENSE file?
- [ ] If applicable, have you updated the NOTICE file?

### For documentation related changes:
- [ ] Have you ensured that format looks appropriate for the output in 
which it is rendered?

### Note:
Please ensure that once the PR is submitted, you check travis-ci for build 
issues and submit an update to your PR as soon as possible.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/minifirocks/nifi-minifi-cpp mqtt_security

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/nifi-minifi-cpp/pull/259.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #259


commit c5d46bdb9dde46c9c3b8e2bb2f8936c5b59c861d
Author: Bin Qiu <benqiu2016@...>
Date:   2018-02-05T15:32:14Z

MINIFICPP-393: Add security support for MQTT




---


[GitHub] nifi-minifi-cpp pull request #242: MINIFICPP-374: Commit Linux power managem...

2018-01-17 Thread minifirocks
Github user minifirocks commented on a diff in the pull request:

https://github.com/apache/nifi-minifi-cpp/pull/242#discussion_r162174433
  
--- Diff: README.md ---
@@ -609,6 +609,21 @@ Additionally, a unique hexadecimal 
uid.minifi.device.segment should be assigned
class: ControllerServiceClass
Properties:
 
+### Linux Power Manager Controller Service
+  The linux power manager controller service can be configured to monitor 
the battery level and status ( discharging or charging ) via the following 
configuration.
+  Simply provide the capacity path and status path along with your 
threshold for the trigger and low battery alarm and you can monitor your 
battery and throttle
+  the threadpools within MiNiFi C++. Note that the name is identified must 
be ThreadPoolManager.
+
+   Controller Services:
+- name: ThreadPoolManager
+  id: 2438e3c8-015a-1000-79ca-83af40ec1888
+  class: LinuxPowerManagerService
+  Properties:
+  Battery Capacity Path: /path/to/battery/capacity
+  Battery Status Path: /path/to/battery/status
--- End diff --

overall looks good. One question I have is why we implement using a 
controller service instead of a processor. It looks like more like processor to 
monitor the standard pro filesystem and perform some user config actions


---


[GitHub] nifi-minifi-cpp issue #236: MINIFICPP-337: make default log directory as log...

2018-01-12 Thread minifirocks
Github user minifirocks commented on the issue:

https://github.com/apache/nifi-minifi-cpp/pull/236
  
@apiri using MINIFI_HOME/logs. please review


---


[GitHub] nifi-minifi-cpp pull request #236: MINIFICPP-337: make default log directory...

2018-01-12 Thread minifirocks
Github user minifirocks commented on a diff in the pull request:

https://github.com/apache/nifi-minifi-cpp/pull/236#discussion_r161296430
  
--- Diff: libminifi/src/core/logging/LoggerConfiguration.cpp ---
@@ -110,6 +112,17 @@ std::shared_ptr 
LoggerConfiguration::initialize_names
   if (!logger_properties->get(appender_key + ".file_name", file_name)) 
{
 file_name = "minifi-app.log";
   }
+  std::string directory = "";
+  if (logger_properties->get(appender_key + ".directory", directory)) {
+// Create the log directory if needed
+struct stat logDirStat;
+if (stat(directory.c_str(), ) != 0 || 
!S_ISDIR(logDirStat.st_mode)) {
+  if (mkdir(directory.c_str(), 0777) == -1) {
--- End diff --

so if you start bin/minifi.sh, the log directory will be ./logs where you 
start the same


---


[GitHub] nifi-minifi-cpp pull request #236: MINIFICPP-337: make default log directory...

2018-01-12 Thread minifirocks
Github user minifirocks commented on a diff in the pull request:

https://github.com/apache/nifi-minifi-cpp/pull/236#discussion_r161295854
  
--- Diff: libminifi/src/core/logging/LoggerConfiguration.cpp ---
@@ -110,6 +112,17 @@ std::shared_ptr 
LoggerConfiguration::initialize_names
   if (!logger_properties->get(appender_key + ".file_name", file_name)) 
{
 file_name = "minifi-app.log";
   }
+  std::string directory = "";
+  if (logger_properties->get(appender_key + ".directory", directory)) {
+// Create the log directory if needed
+struct stat logDirStat;
+if (stat(directory.c_str(), ) != 0 || 
!S_ISDIR(logDirStat.st_mode)) {
+  if (mkdir(directory.c_str(), 0777) == -1) {
--- End diff --

we read the directory variable from minifi-log.properties which specify the 
log name and log directory.
the directory can be a absolute path or relative path.



---


[GitHub] nifi-minifi-cpp pull request #236: MINIFICPP-337: make default log directory...

2018-01-10 Thread minifirocks
GitHub user minifirocks opened a pull request:

https://github.com/apache/nifi-minifi-cpp/pull/236

MINIFICPP-337: make default log directory as logs

Thank you for submitting a contribution to Apache NiFi - MiNiFi C++.

In order to streamline the review of the contribution we ask you
to ensure the following steps have been taken:

### For all changes:
- [ ] Is there a JIRA ticket associated with this PR? Is it referenced
 in the commit message?

- [ ] Does your PR title start with MINIFI- where  is the JIRA 
number you are trying to resolve? Pay particular attention to the hyphen "-" 
character.

- [ ] Has your PR been rebased against the latest commit within the target 
branch (typically master)?

- [ ] Is your initial contribution a single, squashed commit?

### For code changes:
- [ ] If adding new dependencies to the code, are these dependencies 
licensed in a way that is compatible for inclusion under [ASF 
2.0](http://www.apache.org/legal/resolved.html#category-a)?
- [ ] If applicable, have you updated the LICENSE file?
- [ ] If applicable, have you updated the NOTICE file?

### For documentation related changes:
- [ ] Have you ensured that format looks appropriate for the output in 
which it is rendered?

### Note:
Please ensure that once the PR is submitted, you check travis-ci for build 
issues and submit an update to your PR as soon as possible.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/minifirocks/nifi-minifi-cpp MINIFICPP-337

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/nifi-minifi-cpp/pull/236.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #236


commit de8a48c8b15bdec4d5083af2088e5b308faf5ef7
Author: Bin Qiu <benqiu2016@...>
Date:   2018-01-10T16:49:58Z

MINIFICPP-337: make default log directory as logs




---


[GitHub] nifi-minifi-cpp issue #228: MINIFICPP-342: MQTT extension

2018-01-05 Thread minifirocks
Github user minifirocks commented on the issue:

https://github.com/apache/nifi-minifi-cpp/pull/228
  
@phrocker addressed  comments, please review.


---


[GitHub] nifi-minifi-cpp pull request #228: MINIFICPP-342: MQTT extension

2018-01-04 Thread minifirocks
Github user minifirocks commented on a diff in the pull request:

https://github.com/apache/nifi-minifi-cpp/pull/228#discussion_r159807078
  
--- Diff: extensions/mqtt/PublishMQTT.h ---
@@ -0,0 +1,142 @@
+/**
+ * @file PublishMQTT.h
+ * PublishMQTT class declaration
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef __PUBLISH_MQTT_H__
+#define __PUBLISH_MQTT_H__
+
+#include "FlowFileRecord.h"
+#include "core/Processor.h"
+#include "core/ProcessSession.h"
+#include "core/Core.h"
+#include "core/Resource.h"
+#include "core/Property.h"
+#include "core/logging/LoggerConfiguration.h"
+#include "MQTTClient.h"
+#include "AbstractMQTTProcessor.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+
+// PublishMQTT Class
+class PublishMQTT: public processors::AbstractMQTTProcessor {
+public:
+  // Constructor
+  /*!
+   * Create a new processor
+   */
+  explicit PublishMQTT(std::string name, uuid_t uuid = NULL)
+: processors::AbstractMQTTProcessor(name, uuid), 
logger_(logging::LoggerFactory::getLogger()) {
+retain_ = false;
+max_seg_size_ = ULLONG_MAX;
+  }
+  // Destructor
+  virtual ~PublishMQTT() {
+  }
+  // Processor Name
+  static constexpr char const* ProcessorName = "PublishMQTT";
+  // Supported Properties
+  static core::Property Retain;
+  static core::Property MaxFlowSegSize;
+
+  // Nest Callback Class for read stream
+  class ReadCallback: public InputStreamCallback {
+  public:
+ReadCallback(uint64_t flow_size, uint64_t max_seg_size, const 
std::string , MQTTClient client,
+int qos, bool retain, MQTTClient_deliveryToken ) :
+flow_size_(flow_size), max_seg_size_(max_seg_size), key_(key), 
client_(client),
+qos_(qos), retain_(retain), token_(token) {
+  status_ = 0;
+  read_size_ = 0;
+}
+~ReadCallback() {
+}
+int64_t process(std::shared_ptr stream) {
+  if (flow_size_ < max_seg_size_)
+max_seg_size_ = flow_size_;
+  std::vector buffer;
+  buffer.reserve(max_seg_size_);
+  read_size_ = 0;
+  status_ = 0;
+  while (read_size_ < flow_size_) {
+int readRet = stream->read([0], max_seg_size_);
+if (readRet < 0) {
+  status_ = -1;
+  return read_size_;
+}
+if (readRet > 0) {
+  MQTTClient_message pubmsg = MQTTClient_message_initializer;
+  pubmsg.payload = [0];
+  pubmsg.payloadlen = readRet;
+  pubmsg.qos = qos_;
+  pubmsg.retained = retain_;
+  if (MQTTClient_publishMessage(client_, key_.c_str(), , 
_) != MQTTCLIENT_SUCCESS) {
--- End diff --

it add the MQTT header and call socket write.
the deliverable callback is for QOS.
/**
 * This is a callback function. The client application
 * must provide an implementation of this function to enable asynchronous
 * notification of delivery of messages. The function is registered with the
 * client library by passing it as an argument to MQTTClient_setCallbacks().
 * It is called by the client library after the client application has
 * published a message to the server. It indicates that the necessary
 * handshaking and acknowledgements for the requested quality of service 
(see
 * MQTTClient_message.qos) have been completed. This function is executed 
on a
 * separate thread to the one on which the client application is running.
 * Note:MQTTClient_deliveryComplete() is not called when messages are
 * published at QoS0.
 * @param context A pointer to the context value originally passed to
 * MQTTClient_setCallbacks(), which contains any application-specific 
context.
 * @p

[GitHub] nifi-minifi-cpp pull request #228: MINIFICPP-342: MQTT extension

2018-01-04 Thread minifirocks
Github user minifirocks commented on a diff in the pull request:

https://github.com/apache/nifi-minifi-cpp/pull/228#discussion_r159807097
  
--- Diff: thirdparty/paho.mqtt.c/CMakeLists.txt ---
@@ -0,0 +1,86 @@

+#***
+#  Copyright (c) 2015, 2017 logi.cals GmbH and others
+#
+#  All rights reserved. This program and the accompanying materials
+#  are made available under the terms of the Eclipse Public License v1.0
+#  and Eclipse Distribution License v1.0 which accompany this distribution.
+#
+#  The Eclipse Public License is available at
+# http://www.eclipse.org/legal/epl-v10.html
+#  and the Eclipse Distribution License is available at
+#http://www.eclipse.org/org/documents/edl-v10.php.
+#
+#  Contributors:
+# Rainer Poisel - initial version
+# Genis Riera Perez - Add support for building debian package

+#***/
+
+# Note: on OS X you should install XCode and the associated command-line 
tools
+
+CMAKE_MINIMUM_REQUIRED(VERSION 2.8.4)
+PROJECT("paho" C)
+MESSAGE(STATUS "CMake version: " ${CMAKE_VERSION})
+MESSAGE(STATUS "CMake system name: " ${CMAKE_SYSTEM_NAME})
+
+SET(CMAKE_SCRIPTS "${CMAKE_SOURCE_DIR}/cmake")
+SET(CMAKE_MODULE_PATH "${CMAKE_SOURCE_DIR}/cmake/modules")
+
+## build settings
+SET(PAHO_VERSION_MAJOR 1)
+SET(PAHO_VERSION_MINOR 2)
+SET(PAHO_VERSION_PATCH 0)
+SET(CLIENT_VERSION 
${PAHO_VERSION_MAJOR}.${PAHO_VERSION_MINOR}.${PAHO_VERSION_PATCH})
+
+INCLUDE(GNUInstallDirs)
+
+STRING(TIMESTAMP BUILD_TIMESTAMP UTC)
+MESSAGE(STATUS "Timestamp is ${BUILD_TIMESTAMP}")
+
+IF(WIN32)
+  ADD_DEFINITIONS(-D_CRT_SECURE_NO_DEPRECATE -DWIN32_LEAN_AND_MEAN -MD)
+ELSEIF(${CMAKE_SYSTEM_NAME} STREQUAL "Darwin")
+  ADD_DEFINITIONS(-DOSX)
+ENDIF()
+
+## build options
+SET(PAHO_WITH_SSL TRUE CACHE BOOL "Flag that defines whether to build 
ssl-enabled binaries too. ")
+SET(PAHO_BUILD_STATIC FALSE CACHE BOOL "Build static library")
+SET(PAHO_BUILD_DOCUMENTATION FALSE CACHE BOOL "Create and install the HTML 
based API documentation (requires Doxygen)")
+SET(PAHO_BUILD_SAMPLES FALSE CACHE BOOL "Build sample programs")
+SET(PAHO_BUILD_DEB_PACKAGE FALSE CACHE BOOL "Build debian package")
+SET(PAHO_ENABLE_TESTING FALSE CACHE BOOL "Build tests and run")
+
+ADD_SUBDIRECTORY(src)
+IF(PAHO_BUILD_SAMPLES)
+ADD_SUBDIRECTORY(src/samples)
+ENDIF()
+
+IF(PAHO_BUILD_DOCUMENTATION)
+ADD_SUBDIRECTORY(doc)
+ENDIF()
+
+### packaging settings
+IF (WIN32)
+SET(CPACK_GENERATOR "ZIP")
+ELSEIF(PAHO_BUILD_DEB_PACKAGE)
+SET(CPACK_GENERATOR "DEB")
+CONFIGURE_FILE(${CMAKE_SCRIPTS}/CPackDebConfig.cmake.in
+${CMAKE_BINARY_DIR}/CPackDebConfig.cmake @ONLY)
+SET(CPACK_PROJECT_CONFIG_FILE ${CMAKE_BINARY_DIR}/CPackDebConfig.cmake)
+ADD_SUBDIRECTORY(debian)
+ELSE()
+SET(CPACK_GENERATOR "TGZ")
+ENDIF()
+
+SET(CPACK_PACKAGE_VERSION_MAJOR ${PAHO_VERSION_MAJOR})
+SET(CPACK_PACKAGE_VERSION_MINOR ${PAHO_VERSION_MINOR})
+SET(CPACK_PACKAGE_VERSION_PATCH ${PAHO_VERSION_PATCH})
+INCLUDE(CPack)
+
+IF(PAHO_ENABLE_TESTING)
--- End diff --

will remove


---


[GitHub] nifi-minifi-cpp pull request #228: MINIFICPP-342: MQTT extension

2018-01-04 Thread minifirocks
Github user minifirocks commented on a diff in the pull request:

https://github.com/apache/nifi-minifi-cpp/pull/228#discussion_r159807086
  
--- Diff: thirdparty/paho.mqtt.c/.travis.yml ---
@@ -0,0 +1,47 @@
+sudo: true
--- End diff --

will remove


---


[GitHub] nifi-minifi-cpp pull request #228: MINIFICPP-342: MQTT extension

2018-01-04 Thread minifirocks
Github user minifirocks commented on a diff in the pull request:

https://github.com/apache/nifi-minifi-cpp/pull/228#discussion_r159806405
  
--- Diff: extensions/mqtt/ConsumeMQTT.h ---
@@ -0,0 +1,125 @@
+/**
+ * @file ConsumeMQTT.h
+ * ConsumeMQTT class declaration
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef __CONSUME_MQTT_H__
+#define __CONSUME_MQTT_H__
+
+#include 
+#include 
+#include "FlowFileRecord.h"
+#include "core/Processor.h"
+#include "core/ProcessSession.h"
+#include "core/Core.h"
+#include "core/Resource.h"
+#include "core/Property.h"
+#include "core/logging/LoggerConfiguration.h"
+#include "MQTTClient.h"
+#include "AbstractMQTTProcessor.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+
+#define MQTT_TOPIC_ATTRIBUTE "mqtt.topic"
+#define MQTT_BROKER_ATTRIBUTE "mqtt.broker"
+
+// ConsumeMQTT Class
+class ConsumeMQTT: public processors::AbstractMQTTProcessor {
+public:
+  // Constructor
+  /*!
+   * Create a new processor
+   */
+  explicit ConsumeMQTT(std::string name, uuid_t uuid = NULL)
+: processors::AbstractMQTTProcessor(name, uuid), 
logger_(logging::LoggerFactory::getLogger()) {
+isSubscriber_ = true;
+maxQueueSize_ = 100;
+  }
+  // Destructor
+  virtual ~ConsumeMQTT() {
+std::lock_guard < std::mutex > lock(mutex_);
+while (!queue_.empty()) {
+  MQTTClient_message *message = queue_.front();
+  MQTTClient_freeMessage();
+  queue_.pop_front();
+}
+  }
+  // Processor Name
+  static constexpr char const* ProcessorName = "ConsumeMQTT";
+  // Supported Properties
+  static core::Property MaxQueueSize;
+  // Nest Callback Class for write stream
+  class WriteCallback: public OutputStreamCallback {
+  public:
+WriteCallback(MQTTClient_message *message) :
+  message_(message) {
+  status_ = 0;
+}
+MQTTClient_message *message_;
+int64_t process(std::shared_ptr stream) {
+  int64_t len = 
stream->write(reinterpret_cast<uint8_t*>(message_->payload), 
message_->payloadlen);
+  if (len < 0)
+status_ = -1;
+  return len;
+}
+int status_;
+  };
+
+public:
+  /**
+   * Function that's executed when the processor is scheduled.
+   * @param context process context.
+   * @param sessionFactory process session factory that is used when 
creating
+   * ProcessSession objects.
+   */
+  void onSchedule(core::ProcessContext *context, 
core::ProcessSessionFactory *sessionFactory);
+  // OnTrigger method, implemented by NiFi ConsumeMQTT
+  virtual void onTrigger(const std::shared_ptr 
, const std::shared_ptr );
+  // Initialize, over write by NiFi ConsumeMQTT
+  virtual void initialize(void);
+  virtual bool enqueueReceiveMQTTMsg(MQTTClient_message *message);
+
+protected:
+  void getReceivedMQTTMsg(std::deque _queue) {
+std::lock_guard < std::mutex > lock(mutex_);
--- End diff --

will do


---


[GitHub] nifi-minifi-cpp pull request #228: MINIFICPP-342: MQTT extension

2018-01-04 Thread minifirocks
Github user minifirocks commented on a diff in the pull request:

https://github.com/apache/nifi-minifi-cpp/pull/228#discussion_r159806371
  
--- Diff: extensions/mqtt/AbstractMQTTProcessor.h ---
@@ -0,0 +1,154 @@
+/**
+ * @file AbstractMQTTProcessor.h
+ * AbstractMQTTProcessor class declaration
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef __ABSTRACTMQTT_H__
+#define __ABSTRACTMQTT_H__
+
+#include "FlowFileRecord.h"
+#include "core/Processor.h"
+#include "core/ProcessSession.h"
+#include "core/Core.h"
+#include "core/Resource.h"
+#include "core/logging/LoggerConfiguration.h"
+#include "MQTTClient.h"
+
+#define MQTT_QOS_0 "0"
+#define MQTT_QOS_1 "1"
+#define MQTT_QOS_2 "2"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+
+// AbstractMQTTProcessor Class
+class AbstractMQTTProcessor : public core::Processor {
+ public:
+  // Constructor
+  /*!
+   * Create a new processor
+   */
+  explicit AbstractMQTTProcessor(std::string name, uuid_t uuid = NULL)
+  : core::Processor(name, uuid),
+
logger_(logging::LoggerFactory::getLogger()) {
+client_ = nullptr;
+cleanSession_ = false;
+keepAliveInterval_ = 60;
+connectionTimeOut_ = 30;
+qos_ = 0;
+isSubscriber_ = false;
+  }
+  // Destructor
+  virtual ~AbstractMQTTProcessor() {
+if (isSubscriber_) {
+  MQTTClient_unsubscribe(client_, topic_.c_str());
--- End diff --

it should be. only side effect is if app halt and did not unsub. The broker 
will buffer the msgs to the topic while the client is go away. if the app 
restart with the same topic and same client ID, it will rx all msg buffer for 
that client buffered in the broker.



---


[GitHub] nifi-minifi-cpp pull request #228: MINIFICPP-342: MQTT extension

2018-01-04 Thread minifirocks
Github user minifirocks commented on a diff in the pull request:

https://github.com/apache/nifi-minifi-cpp/pull/228#discussion_r159805956
  
--- Diff: extensions/mqtt/AbstractMQTTProcessor.cpp ---
@@ -0,0 +1,158 @@
+/**
+ * @file AbstractMQTTProcessor.cpp
+ * AbstractMQTTProcessor class implementation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "AbstractMQTTProcessor.h"
+#include 
+#include 
+#include 
+#include "utils/TimeUtil.h"
+#include "utils/StringUtils.h"
+#include "core/ProcessContext.h"
+#include "core/ProcessSession.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+
+core::Property AbstractMQTTProcessor::BrokerURL("Broker URI", "The URI to 
use to connect to the MQTT broker", "");
+core::Property AbstractMQTTProcessor::CleanSession("Session state", 
"Whether to start afresh or resume previous flows. See the allowable value 
descriptions for more details", "true");
+core::Property AbstractMQTTProcessor::ClientID("Client ID", "MQTT client 
ID to use", "");
+core::Property AbstractMQTTProcessor::UserName("Username", "Username to 
use when connecting to the broker", "");
+core::Property AbstractMQTTProcessor::PassWord("Password", "Password to 
use when connecting to the broker", "");
+core::Property AbstractMQTTProcessor::KeepLiveInterval("Keep Alive 
Interval", "Defines the maximum time interval between messages sent or 
received", "60 sec");
+core::Property AbstractMQTTProcessor::ConnectionTimeOut("Connection 
Timeout", "Maximum time interval the client will wait for the network 
connection to the MQTT server", "30 sec");
+core::Property AbstractMQTTProcessor::QOS("Quality of Service", "The 
Quality of Service(QoS) to send the message with. Accepts three values '0', '1' 
and '2'", "MQTT_QOS_0");
+core::Property AbstractMQTTProcessor::Topic("Topic", "The topic to publish 
the message to", "");
+core::Relationship AbstractMQTTProcessor::Success("success", "FlowFiles 
that are sent successfully to the destination are transferred to this 
relationship");
+core::Relationship AbstractMQTTProcessor::Failure("failure", "FlowFiles 
that failed to send to the destination are transferred to this relationship");
+
+void AbstractMQTTProcessor::initialize() {
+  // Set the supported properties
+  std::set properties;
+  properties.insert(BrokerURL);
+  properties.insert(CleanSession);
+  properties.insert(ClientID);
+  properties.insert(UserName);
+  properties.insert(PassWord);
+  properties.insert(KeepLiveInterval);
+  properties.insert(ConnectionTimeOut);
+  properties.insert(QOS);
+  properties.insert(Topic);
+  setSupportedProperties(properties);
+  // Set the supported relationships
+  std::set relationships;
+  relationships.insert(Success);
+  relationships.insert(Failure);
+  setSupportedRelationships(relationships);
+}
+
+void AbstractMQTTProcessor::onSchedule(core::ProcessContext *context, 
core::ProcessSessionFactory *sessionFactory) {
+  std::string value;
+  int64_t valInt;
+  value = "";
+  if (context->getProperty(BrokerURL.getName(), value) && !value.empty()) {
+uri_ = value;
+logger_->log_info("AbstractMQTTProcessor: BrokerURL [%s]", uri_);
+  }
+  value = "";
+  if (context->getProperty(ClientID.getName(), value) && !value.empty()) {
+clientID_ = value;
+logger_->log_info("AbstractMQTTProcessor: ClientID [%s]", clientID_);
+  }
+  value = "";
+  if (context->getProperty

[GitHub] nifi-minifi-cpp pull request #228: MINIFICPP-342: MQTT extension

2018-01-04 Thread minifirocks
Github user minifirocks commented on a diff in the pull request:

https://github.com/apache/nifi-minifi-cpp/pull/228#discussion_r159804409
  
--- Diff: extensions/mqtt/AbstractMQTTProcessor.cpp ---
@@ -0,0 +1,158 @@
+/**
+ * @file AbstractMQTTProcessor.cpp
+ * AbstractMQTTProcessor class implementation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "AbstractMQTTProcessor.h"
+#include 
+#include 
+#include 
+#include "utils/TimeUtil.h"
+#include "utils/StringUtils.h"
+#include "core/ProcessContext.h"
+#include "core/ProcessSession.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+
+core::Property AbstractMQTTProcessor::BrokerURL("Broker URI", "The URI to 
use to connect to the MQTT broker", "");
+core::Property AbstractMQTTProcessor::CleanSession("Session state", 
"Whether to start afresh or resume previous flows. See the allowable value 
descriptions for more details", "true");
+core::Property AbstractMQTTProcessor::ClientID("Client ID", "MQTT client 
ID to use", "");
+core::Property AbstractMQTTProcessor::UserName("Username", "Username to 
use when connecting to the broker", "");
+core::Property AbstractMQTTProcessor::PassWord("Password", "Password to 
use when connecting to the broker", "");
+core::Property AbstractMQTTProcessor::KeepLiveInterval("Keep Alive 
Interval", "Defines the maximum time interval between messages sent or 
received", "60 sec");
+core::Property AbstractMQTTProcessor::ConnectionTimeOut("Connection 
Timeout", "Maximum time interval the client will wait for the network 
connection to the MQTT server", "30 sec");
+core::Property AbstractMQTTProcessor::QOS("Quality of Service", "The 
Quality of Service(QoS) to send the message with. Accepts three values '0', '1' 
and '2'", "MQTT_QOS_0");
+core::Property AbstractMQTTProcessor::Topic("Topic", "The topic to publish 
the message to", "");
+core::Relationship AbstractMQTTProcessor::Success("success", "FlowFiles 
that are sent successfully to the destination are transferred to this 
relationship");
+core::Relationship AbstractMQTTProcessor::Failure("failure", "FlowFiles 
that failed to send to the destination are transferred to this relationship");
+
+void AbstractMQTTProcessor::initialize() {
+  // Set the supported properties
+  std::set properties;
+  properties.insert(BrokerURL);
+  properties.insert(CleanSession);
+  properties.insert(ClientID);
+  properties.insert(UserName);
+  properties.insert(PassWord);
+  properties.insert(KeepLiveInterval);
+  properties.insert(ConnectionTimeOut);
+  properties.insert(QOS);
+  properties.insert(Topic);
+  setSupportedProperties(properties);
+  // Set the supported relationships
+  std::set relationships;
+  relationships.insert(Success);
+  relationships.insert(Failure);
+  setSupportedRelationships(relationships);
+}
+
+void AbstractMQTTProcessor::onSchedule(core::ProcessContext *context, 
core::ProcessSessionFactory *sessionFactory) {
+  std::string value;
+  int64_t valInt;
+  value = "";
+  if (context->getProperty(BrokerURL.getName(), value) && !value.empty()) {
+uri_ = value;
+logger_->log_info("AbstractMQTTProcessor: BrokerURL [%s]", uri_);
+  }
+  value = "";
+  if (context->getProperty(ClientID.getName(), value) && !value.empty()) {
+clientID_ = value;
+logger_->log_info("AbstractMQTTProcessor: ClientID [%s]", clientID_);
+  }
+  value = "";
+  if (context->getProperty

[GitHub] nifi-minifi-cpp pull request #228: MINIFICPP-342: MQTT extension

2018-01-04 Thread minifirocks
Github user minifirocks commented on a diff in the pull request:

https://github.com/apache/nifi-minifi-cpp/pull/228#discussion_r159804421
  
--- Diff: extensions/mqtt/AbstractMQTTProcessor.cpp ---
@@ -0,0 +1,158 @@
+/**
+ * @file AbstractMQTTProcessor.cpp
+ * AbstractMQTTProcessor class implementation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "AbstractMQTTProcessor.h"
+#include 
+#include 
+#include 
+#include "utils/TimeUtil.h"
+#include "utils/StringUtils.h"
+#include "core/ProcessContext.h"
+#include "core/ProcessSession.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+
+core::Property AbstractMQTTProcessor::BrokerURL("Broker URI", "The URI to 
use to connect to the MQTT broker", "");
+core::Property AbstractMQTTProcessor::CleanSession("Session state", 
"Whether to start afresh or resume previous flows. See the allowable value 
descriptions for more details", "true");
+core::Property AbstractMQTTProcessor::ClientID("Client ID", "MQTT client 
ID to use", "");
+core::Property AbstractMQTTProcessor::UserName("Username", "Username to 
use when connecting to the broker", "");
+core::Property AbstractMQTTProcessor::PassWord("Password", "Password to 
use when connecting to the broker", "");
+core::Property AbstractMQTTProcessor::KeepLiveInterval("Keep Alive 
Interval", "Defines the maximum time interval between messages sent or 
received", "60 sec");
+core::Property AbstractMQTTProcessor::ConnectionTimeOut("Connection 
Timeout", "Maximum time interval the client will wait for the network 
connection to the MQTT server", "30 sec");
+core::Property AbstractMQTTProcessor::QOS("Quality of Service", "The 
Quality of Service(QoS) to send the message with. Accepts three values '0', '1' 
and '2'", "MQTT_QOS_0");
+core::Property AbstractMQTTProcessor::Topic("Topic", "The topic to publish 
the message to", "");
+core::Relationship AbstractMQTTProcessor::Success("success", "FlowFiles 
that are sent successfully to the destination are transferred to this 
relationship");
+core::Relationship AbstractMQTTProcessor::Failure("failure", "FlowFiles 
that failed to send to the destination are transferred to this relationship");
+
+void AbstractMQTTProcessor::initialize() {
+  // Set the supported properties
+  std::set properties;
+  properties.insert(BrokerURL);
+  properties.insert(CleanSession);
+  properties.insert(ClientID);
+  properties.insert(UserName);
+  properties.insert(PassWord);
+  properties.insert(KeepLiveInterval);
+  properties.insert(ConnectionTimeOut);
+  properties.insert(QOS);
+  properties.insert(Topic);
+  setSupportedProperties(properties);
+  // Set the supported relationships
+  std::set relationships;
+  relationships.insert(Success);
+  relationships.insert(Failure);
+  setSupportedRelationships(relationships);
+}
+
+void AbstractMQTTProcessor::onSchedule(core::ProcessContext *context, 
core::ProcessSessionFactory *sessionFactory) {
+  std::string value;
+  int64_t valInt;
+  value = "";
+  if (context->getProperty(BrokerURL.getName(), value) && !value.empty()) {
+uri_ = value;
+logger_->log_info("AbstractMQTTProcessor: BrokerURL [%s]", uri_);
+  }
+  value = "";
+  if (context->getProperty(ClientID.getName(), value) && !value.empty()) {
+clientID_ = value;
+logger_->log_info("AbstractMQTTProcessor: ClientID [%s]", clientID_);
+  }
+  value = "";
+  if (context->getProperty

[GitHub] nifi-minifi-cpp issue #227: MINIFICPP-355: Resolve issue with 32-bit systems...

2018-01-04 Thread minifirocks
Github user minifirocks commented on the issue:

https://github.com/apache/nifi-minifi-cpp/pull/227
  
looks good, can we merge into 1 commit


---


[GitHub] nifi-minifi-cpp pull request #228: MINIFICPP-342: MQTT extension

2018-01-04 Thread minifirocks
GitHub user minifirocks opened a pull request:

https://github.com/apache/nifi-minifi-cpp/pull/228

MINIFICPP-342: MQTT extension

Thank you for submitting a contribution to Apache NiFi - MiNiFi C++.

In order to streamline the review of the contribution we ask you
to ensure the following steps have been taken:

### For all changes:
- [ ] Is there a JIRA ticket associated with this PR? Is it referenced
 in the commit message?

- [ ] Does your PR title start with MINIFI- where  is the JIRA 
number you are trying to resolve? Pay particular attention to the hyphen "-" 
character.

- [ ] Has your PR been rebased against the latest commit within the target 
branch (typically master)?

- [ ] Is your initial contribution a single, squashed commit?

### For code changes:
- [ ] If adding new dependencies to the code, are these dependencies 
licensed in a way that is compatible for inclusion under [ASF 
2.0](http://www.apache.org/legal/resolved.html#category-a)?
- [ ] If applicable, have you updated the LICENSE file?
- [ ] If applicable, have you updated the NOTICE file?

### For documentation related changes:
- [ ] Have you ensured that format looks appropriate for the output in 
which it is rendered?

### Note:
Please ensure that once the PR is submitted, you check travis-ci for build 
issues and submit an update to your PR as soon as possible.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/minifirocks/nifi-minifi-cpp mqtt_dev

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/nifi-minifi-cpp/pull/228.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #228


commit cc47483e04164102b2382a78bf68d10f4e8f5efe
Author: Bin Qiu <benqiu2016@...>
Date:   2018-01-04T16:23:14Z

MINIFICPP-342: MQTT extension




---


[GitHub] nifi-minifi-cpp issue #217: MINIFICPP-41: First iteration of C api

2017-12-18 Thread minifirocks
Github user minifirocks commented on the issue:

https://github.com/apache/nifi-minifi-cpp/pull/217
  
yes, +1


---


[GitHub] nifi-minifi-cpp issue #216: MINIFICPP-341 Add CentOS6 build instructions

2017-12-15 Thread minifirocks
Github user minifirocks commented on the issue:

https://github.com/apache/nifi-minifi-cpp/pull/216
  
please resolve conflict


---


[GitHub] nifi-minifi-cpp issue #217: MINIFICPP-41: First iteration of C api

2017-12-15 Thread minifirocks
Github user minifirocks commented on the issue:

https://github.com/apache/nifi-minifi-cpp/pull/217
  
please merge these two commit


---


[GitHub] nifi-minifi-cpp issue #217: MINIFICPP-41: First iteration of C api

2017-12-13 Thread minifirocks
Github user minifirocks commented on the issue:

https://github.com/apache/nifi-minifi-cpp/pull/217
  
looks good


---


[GitHub] nifi-minifi-cpp pull request #217: MINIFICPP-41: First iteration of C api

2017-12-11 Thread minifirocks
Github user minifirocks commented on a diff in the pull request:

https://github.com/apache/nifi-minifi-cpp/pull/217#discussion_r156161507
  
--- Diff: libminifi/src/capi/api.cpp ---
@@ -0,0 +1,251 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include 
+#include 
+#include 
+#include 
+#include "core/Core.h"
+#include "capi/api.h"
+#include "capi/expect.h"
+#include "capi/Instance.h"
+#include "capi/Plan.h"
+#include "ResourceClaim.h"
+
+/**
+ * Creates a NiFi Instance from the url and output port.
+ * @param url http URL for NiFi instance
+ * @param port Remote output port.
+ */
+nifi_instance *create_instance(char *url, nifi_port *port) {
+  minifi::setDefaultDirectory(DEFAULT_CONTENT_DIRECTORY);
+  nifi_instance *instance = new nifi_instance;
+
+  instance->instance_ptr = new minifi::Instance(url, port->pord_id);
+  instance->port.pord_id = port->pord_id;
+
+  return instance;
+}
+
+/**
+ * Initializes the instance
+ */
+void initialize_instance(nifi_instance *instance) {
+  auto minifi_instance_ref = 
static_cast<minifi::Instance*>(instance->instance_ptr);
+  minifi_instance_ref->initialize(instance->port.pord_id);
--- End diff --

same as above, we need to init instance just once.


---


[GitHub] nifi-minifi-cpp pull request #217: MINIFICPP-41: First iteration of C api

2017-12-11 Thread minifirocks
Github user minifirocks commented on a diff in the pull request:

https://github.com/apache/nifi-minifi-cpp/pull/217#discussion_r156161257
  
--- Diff: libminifi/src/capi/api.cpp ---
@@ -0,0 +1,251 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include 
+#include 
+#include 
+#include 
+#include "core/Core.h"
+#include "capi/api.h"
+#include "capi/expect.h"
+#include "capi/Instance.h"
+#include "capi/Plan.h"
+#include "ResourceClaim.h"
+
+/**
+ * Creates a NiFi Instance from the url and output port.
+ * @param url http URL for NiFi instance
+ * @param port Remote output port.
+ */
+nifi_instance *create_instance(char *url, nifi_port *port) {
+  minifi::setDefaultDirectory(DEFAULT_CONTENT_DIRECTORY);
--- End diff --

is the instance need to be a singleton to avoid user create instance again 
when try to call the same second time


---


[GitHub] nifi-minifi-cpp issue #188: MINIFICPP-49 Added initial implementation of NiF...

2017-12-01 Thread minifirocks
Github user minifirocks commented on the issue:

https://github.com/apache/nifi-minifi-cpp/pull/188
  
@achristianson here is conflict. 


---


[GitHub] nifi-minifi-cpp pull request #188: MINIFICPP-49 Added initial implementation...

2017-11-30 Thread minifirocks
Github user minifirocks commented on a diff in the pull request:

https://github.com/apache/nifi-minifi-cpp/pull/188#discussion_r154135525
  
--- Diff: extensions/expression-language/ProcessContextExpr.cpp ---
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include 
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace core {
+
+bool ProcessContext::getProperty(const std::string , std::string 
,
+ const std::shared_ptr 
_file) {
+  if (expressions_.find(name) == expressions_.end()) {
--- End diff --

OK.


---


[GitHub] nifi-minifi-cpp issue #188: MINIFICPP-49 Added initial implementation of NiF...

2017-11-30 Thread minifirocks
Github user minifirocks commented on the issue:

https://github.com/apache/nifi-minifi-cpp/pull/188
  
looks good to me +1


---


[GitHub] nifi-minifi-cpp pull request #188: MINIFICPP-49 Added initial implementation...

2017-11-28 Thread minifirocks
Github user minifirocks commented on a diff in the pull request:

https://github.com/apache/nifi-minifi-cpp/pull/188#discussion_r153617015
  
--- Diff: extensions/expression-language/ProcessContextExpr.cpp ---
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include 
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace core {
+
+bool ProcessContext::getProperty(const std::string , std::string 
,
+ const std::shared_ptr 
_file) {
+  if (expressions_.find(name) == expressions_.end()) {
--- End diff --

I did not see the code path for EL for property handling. it will make 
sense to change README to illustrate the example use case for EL


---


[GitHub] nifi-minifi-cpp pull request #209: MINIFICPP-329: Change PutKafka to Publish...

2017-11-28 Thread minifirocks
GitHub user minifirocks opened a pull request:

https://github.com/apache/nifi-minifi-cpp/pull/209

MINIFICPP-329: Change PutKafka to PublishKafka

Thank you for submitting a contribution to Apache NiFi - MiNiFi C++.

In order to streamline the review of the contribution we ask you
to ensure the following steps have been taken:

### For all changes:
- [ ] Is there a JIRA ticket associated with this PR? Is it referenced
 in the commit message?

- [ ] Does your PR title start with MINIFI- where  is the JIRA 
number you are trying to resolve? Pay particular attention to the hyphen "-" 
character.

- [ ] Has your PR been rebased against the latest commit within the target 
branch (typically master)?

- [ ] Is your initial contribution a single, squashed commit?

### For code changes:
- [ ] If adding new dependencies to the code, are these dependencies 
licensed in a way that is compatible for inclusion under [ASF 
2.0](http://www.apache.org/legal/resolved.html#category-a)?
- [ ] If applicable, have you updated the LICENSE file?
- [ ] If applicable, have you updated the NOTICE file?

### For documentation related changes:
- [ ] Have you ensured that format looks appropriate for the output in 
which it is rendered?

### Note:
Please ensure that once the PR is submitted, you check travis-ci for build 
issues and submit an update to your PR as soon as possible.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/minifirocks/nifi-minifi-cpp kafka_refactor

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/nifi-minifi-cpp/pull/209.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #209


commit 2bcec3a815077cd330814a9a07f64d8ada01e90a
Author: Bin Qiu <benqiu2...@gmail.com>
Date:   2017-11-28T15:25:15Z

MINIFICPP-329: Change PutKafka to PublishKafka




---


[GitHub] nifi-minifi-cpp issue #183: MINIFICPP-301 Added initial implementation of TF...

2017-11-16 Thread minifirocks
Github user minifirocks commented on the issue:

https://github.com/apache/nifi-minifi-cpp/pull/183
  
@achristianson please merge to one commit


---


[GitHub] nifi-minifi-cpp issue #182: MINIFICPP-274: PutKafka Processor

2017-11-15 Thread minifirocks
Github user minifirocks commented on the issue:

https://github.com/apache/nifi-minifi-cpp/pull/182
  
@phrocker addressed the review comments.


---


[GitHub] nifi-minifi-cpp pull request #183: MINIFICPP-301 Added initial implementatio...

2017-11-14 Thread minifirocks
Github user minifirocks commented on a diff in the pull request:

https://github.com/apache/nifi-minifi-cpp/pull/183#discussion_r150886943
  
--- Diff: extensions/tensorflow/TFApplyGraph.cpp ---
@@ -0,0 +1,227 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "TFApplyGraph.h"
+
+#include "tensorflow/cc/ops/standard_ops.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+
+core::Property TFApplyGraph::InputNode(  // NOLINT
+"Input Node",
+"The node of the TensorFlow graph to feed tensor inputs to", "");
+core::Property TFApplyGraph::OutputNode(  // NOLINT
+"Output Node",
+"The node of the TensorFlow graph to read tensor outputs from", "");
+
+core::Relationship TFApplyGraph::Success(  // NOLINT
+"success",
+"Successful graph application outputs");
+core::Relationship TFApplyGraph::Retry(  // NOLINT
+"retry",
+"Inputs which fail graph application but may work if sent again");
+core::Relationship TFApplyGraph::Failure(  // NOLINT
+"failure",
+"Failures which will not work if retried");
+
+void TFApplyGraph::initialize() {
+  std::set properties;
+  properties.insert(InputNode);
+  properties.insert(OutputNode);
+  setSupportedProperties(std::move(properties));
+
+  std::set relationships;
+  relationships.insert(Success);
+  relationships.insert(Retry);
+  relationships.insert(Failure);
+  setSupportedRelationships(std::move(relationships));
+}
+
+void TFApplyGraph::onSchedule(core::ProcessContext *context, 
core::ProcessSessionFactory *sessionFactory) {
+  context->getProperty(InputNode.getName(), input_node_);
+
+  if (input_node_.empty()) {
+logger_->log_error("Invalid input node");
+  }
+
+  context->getProperty(OutputNode.getName(), output_node_);
+
+  if (output_node_.empty()) {
+logger_->log_error("Invalid output node");
+  }
+}
+
+void TFApplyGraph::onTrigger(const std::shared_ptr 
,
+ const std::shared_ptr 
) {
+  auto flow_file = session->get();
--- End diff --

we overwrite the flow input with output, do we have a use case that we need 
to keep input and create a new flow for output.


---


[GitHub] nifi-minifi-cpp pull request #183: MINIFICPP-301 Added initial implementatio...

2017-11-14 Thread minifirocks
Github user minifirocks commented on a diff in the pull request:

https://github.com/apache/nifi-minifi-cpp/pull/183#discussion_r150886217
  
--- Diff: extensions/tensorflow/TFApplyGraph.h ---
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef NIFI_MINIFI_CPP_TFAPPLYGRAPH_H
+#define NIFI_MINIFI_CPP_TFAPPLYGRAPH_H
+
+#include 
+
+#include 
+#include 
+#include 
+#include 
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+
+class TFApplyGraph : public core::Processor {
+ public:
+  explicit TFApplyGraph(const std::string , uuid_t uuid = nullptr)
+  : Processor(name, uuid),
+logger_(logging::LoggerFactory::getLogger()) {
+  }
+
+  static void hello();
--- End diff --

unused hello


---


[GitHub] nifi-minifi-cpp pull request #183: MINIFICPP-301 Added initial implementatio...

2017-11-14 Thread minifirocks
Github user minifirocks commented on a diff in the pull request:

https://github.com/apache/nifi-minifi-cpp/pull/183#discussion_r150878519
  
--- Diff: extensions/tensorflow/TFApplyGraph.cpp ---
@@ -0,0 +1,227 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "TFApplyGraph.h"
+
+#include "tensorflow/cc/ops/standard_ops.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+
+core::Property TFApplyGraph::InputNode(  // NOLINT
+"Input Node",
+"The node of the TensorFlow graph to feed tensor inputs to", "");
+core::Property TFApplyGraph::OutputNode(  // NOLINT
+"Output Node",
+"The node of the TensorFlow graph to read tensor outputs from", "");
+
+core::Relationship TFApplyGraph::Success(  // NOLINT
+"success",
+"Successful graph application outputs");
+core::Relationship TFApplyGraph::Retry(  // NOLINT
+"retry",
+"Inputs which fail graph application but may work if sent again");
+core::Relationship TFApplyGraph::Failure(  // NOLINT
+"failure",
+"Failures which will not work if retried");
+
+void TFApplyGraph::initialize() {
+  std::set properties;
+  properties.insert(InputNode);
+  properties.insert(OutputNode);
+  setSupportedProperties(std::move(properties));
+
+  std::set relationships;
+  relationships.insert(Success);
+  relationships.insert(Retry);
+  relationships.insert(Failure);
+  setSupportedRelationships(std::move(relationships));
+}
+
+void TFApplyGraph::onSchedule(core::ProcessContext *context, 
core::ProcessSessionFactory *sessionFactory) {
+  context->getProperty(InputNode.getName(), input_node_);
+
+  if (input_node_.empty()) {
+logger_->log_error("Invalid input node");
+  }
+
+  context->getProperty(OutputNode.getName(), output_node_);
+
+  if (output_node_.empty()) {
+logger_->log_error("Invalid output node");
+  }
+}
+
+void TFApplyGraph::onTrigger(const std::shared_ptr 
,
+ const std::shared_ptr 
) {
+  auto flow_file = session->get();
+
+  if (!flow_file) {
+return;
+  }
+
+  try {
+// Read graph
+std::string tf_type;
+flow_file->getAttribute("tf.type", tf_type);
+
+std::shared_ptr graph_def;
+uint32_t graph_version;
+
+{
+  std::lock_guard guard(graph_def_mtx_);
+
+  if ("graph" == tf_type) {
--- End diff --

after we reload the graph, do we need to clean the context which associated 
with the old graph


---


[GitHub] nifi-minifi-cpp pull request #181: MINIFICPP-297: Remove Boost dependencies ...

2017-11-14 Thread minifirocks
Github user minifirocks commented on a diff in the pull request:

https://github.com/apache/nifi-minifi-cpp/pull/181#discussion_r150865974
  
--- Diff: libminifi/include/utils/file/FileManager.h ---
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIBMINIFI_INCLUDE_UTILS_FILEMANAGER_H_
+#define LIBMINIFI_INCLUDE_UTILS_FILEMANAGER_H_
+
+#ifdef BOOST_VERSION
+#include 
+#else
+#include 
+#endif
+#include 
+#include 
+#include 
+#include "io/validation.h"
+#include "utils/Id.h"
+#include "utils/StringUtils.h"
+#ifdef WIN32
+#define stat _stat
+#endif
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace utils {
+namespace file {
+
+/**
+ * Simple implementation of simple file manager utilities.
+ *
+ * unique_file is not a static implementation so that we can support scope 
driven temporary files.
+ */
+class FileManager {
+ public:
+
+  FileManager() {
+  }
+
+  ~FileManager() {
+for (auto file : unique_files_) {
+  unlink(file.c_str());
+}
+  }
+  std::string unique_file(const std::string , bool keep = false) {
+
+if (!IsNullOrEmpty(location)) {
+  std::string file_name = location + "/" + 
non_repeating_string_generator_.generate();
+  while (!verify_not_exist(file_name)) {
+file_name = location + "/" + 
non_repeating_string_generator_.generate();
+  }
+  if (!keep)
+unique_files_.push_back(file_name);
--- End diff --

why we need to do unlink later in the destructor


---


[GitHub] nifi-minifi-cpp pull request #183: MINIFICPP-301 Added initial implementatio...

2017-11-14 Thread minifirocks
Github user minifirocks commented on a diff in the pull request:

https://github.com/apache/nifi-minifi-cpp/pull/183#discussion_r150858096
  
--- Diff: CMakeLists.txt ---
@@ -155,6 +155,13 @@ if (ENABLE_USB_CAMERA)
 createExtension(USB-CAMERA-EXTENSIONS "USB CAMERA EXTENSIONS" "This 
enables USB camera support" "extensions/usb-camera" 
"${TEST_DIR}/usb-camera-tests" "TRUE" "thirdparty/libuvc-0.0.6")
 endif()
 
+## TensorFlow extensions
+## Disabled by default because TF can be complex/environment-specific to 
build
+option(ENABLE_TENSORFLOW "Disables the TensorFlow extensions." OFF)
+if (ENABLE_TENSORFLOW)
+createExtension(TENSORFLOW-EXTENSIONS "TENSORFLOW EXTENSIONS" "This 
enables TensorFlow support" "extensions/tensorflow" 
"${TEST_DIR}/tensorflow-tests")
--- End diff --

does TF provide source built as third party library. in case we want to 
build for other target like ARM, etc


---


[GitHub] nifi-minifi-cpp pull request #182: MINIFICPP-274: PutKafka Processor

2017-11-11 Thread minifirocks
GitHub user minifirocks opened a pull request:

https://github.com/apache/nifi-minifi-cpp/pull/182

MINIFICPP-274: PutKafka Processor

Thank you for submitting a contribution to Apache NiFi - MiNiFi C++.

In order to streamline the review of the contribution we ask you
to ensure the following steps have been taken:

### For all changes:
- [ ] Is there a JIRA ticket associated with this PR? Is it referenced
 in the commit message?

- [ ] Does your PR title start with MINIFI- where  is the JIRA 
number you are trying to resolve? Pay particular attention to the hyphen "-" 
character.

- [ ] Has your PR been rebased against the latest commit within the target 
branch (typically master)?

- [ ] Is your initial contribution a single, squashed commit?

### For code changes:
- [ ] If adding new dependencies to the code, are these dependencies 
licensed in a way that is compatible for inclusion under [ASF 
2.0](http://www.apache.org/legal/resolved.html#category-a)?
- [ ] If applicable, have you updated the LICENSE file?
- [ ] If applicable, have you updated the NOTICE file?

### For documentation related changes:
- [ ] Have you ensured that format looks appropriate for the output in 
which it is rendered?

### Note:
Please ensure that once the PR is submitted, you check travis-ci for build 
issues and submit an update to your PR as soon as possible.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/minifirocks/nifi-minifi-cpp kafka

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/nifi-minifi-cpp/pull/182.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #182


commit a2abdfeaffaf86ea9dbddae5193988e956829618
Author: Bin Qiu <benqiu2...@gmail.com>
Date:   2017-11-10T16:50:04Z

MINIFICPP-274: PutKafka Processor




---


[GitHub] nifi-minifi-cpp issue #172: MINIFI-218: GetGPS processor implementation

2017-11-09 Thread minifirocks
Github user minifirocks commented on the issue:

https://github.com/apache/nifi-minifi-cpp/pull/172
  
@phrocker looks good, please merge the two commits into one.


---


[GitHub] nifi-minifi-cpp issue #173: MINIFICPP-284 Handle processor configuration err...

2017-11-03 Thread minifirocks
Github user minifirocks commented on the issue:

https://github.com/apache/nifi-minifi-cpp/pull/173
  
looks good.


---


[GitHub] nifi-minifi-cpp issue #158: MINIFICPP-60: Add initial implementation of Site...

2017-10-31 Thread minifirocks
Github user minifirocks commented on the issue:

https://github.com/apache/nifi-minifi-cpp/pull/158
  
@phrocker please resolve the conflict and merge into simple commit, update 
readme for the same.


---


[GitHub] nifi-minifi-cpp pull request #162: MINIFICPP-273 Simplify configuration of M...

2017-10-30 Thread minifirocks
Github user minifirocks commented on a diff in the pull request:

https://github.com/apache/nifi-minifi-cpp/pull/162#discussion_r147889510
  
--- Diff: libminifi/src/core/yaml/YamlConfiguration.cpp ---
@@ -698,6 +703,34 @@ void YamlConfiguration::checkRequiredField(YAML::Node 
*yamlNode, const std::stri
   }
 }
 
+YAML::Node YamlConfiguration::getOptionalField(YAML::Node *yamlNode,
+   const std::string 
,
+   const YAML::Node 
,
+   const std::string 
,
+   const std::string 
) {
+  std::string infoMessage = providedInfoMessage;
+  auto result = yamlNode->as()[fieldName];
+  if (!result) {
+if (infoMessage.empty()) {
+  // Build a helpful info message for the user to inform them that a 
default is being used
+  infoMessage =
+  yamlNode->as()["name"] ?
+  "Using default value for optional field '" + fieldName + "' in 
component named '"
+  + yamlNode->as()["name"].as() + "'" 
:
+  "Using default value for optional field '" + fieldName + "' ";
+  if (!yamlSection.empty()) {
+infoMessage += " [in '" + yamlSection + "' section of 
configuration file]: ";
+  }
+
+  infoMessage += defaultValue.as();
+}
+logger_->log_info(infoMessage.c_str());
+result = defaultValue;
--- End diff --

no return of the result. please run make linter


---


[GitHub] nifi-minifi-cpp pull request #158: MINIFICPP-60: Add initial implementation ...

2017-10-30 Thread minifirocks
Github user minifirocks commented on a diff in the pull request:

https://github.com/apache/nifi-minifi-cpp/pull/158#discussion_r147837447
  
--- Diff: extensions/http-curl/sitetosite/HTTPProtocol.cpp ---
@@ -0,0 +1,312 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "HTTPProtocol.h"
+
+#include 
+#include 
+#include 
+#include 
+#include 
+#include 
+#include 
+#include 
+#include 
+#include 
+#include 
+
+#include "PeersEntity.h"
+#include "io/CRCStream.h"
+#include "sitetosite/Peer.h"
+#include "io/validation.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace sitetosite {
+
+std::shared_ptr HttpSiteToSiteClient::id_generator_ = 
utils::IdGenerator::getIdGenerator();
+
+const std::string HttpSiteToSiteClient::parseTransactionId(const 
std::string ) {
+  int i = 0;
+  for (i = uri.length() - 1; i >= 0; i--) {
+if (uri.at(i) == '/')
+  break;
+  }
+  return uri.substr(i + 1, uri.length() - (i + 1));
+}
+
+std::shared_ptr 
HttpSiteToSiteClient::createTransaction(std::string , 
TransferDirection direction) {
+  std::string dir_str = direction == SEND ? "input-ports" : "output-ports";
+  std::stringstream uri;
+  uri << getBaseURI() << "data-transfer/" << dir_str << "/" << getPortId() 
<< "/transactions";
+  auto client = create_http_client(uri.str(), "POST");
+
+  client->appendHeader(PROTOCOL_VERSION_HEADER, "1");
+
+  client->setConnectionTimeout(5);
+
+  client->setContentType("application/json");
+  client->appendHeader("Accept: application/json");
+  client->setUseChunkedEncoding();
+  client->setPostFields("");
+  client->submit();
+  peer_->setStream(nullptr);
+  if (client->getResponseCode() == 201) {
+// parse the headers
+auto headers = client->getParsedHeaders();
+auto intent_name = headers["x-location-uri-intent"];
+if (intent_name == "transaction-url") {
+  auto url = headers["Location"];
+
+  if (IsNullOrEmpty()) {
+logger_->log_debug("Location is empty");
+  } else {
+
+org::apache::nifi::minifi::io::CRCStream 
crcstream(peer_.get());
+auto transaction = std::make_shared(direction, 
crcstream);
+transaction->initialize(this, url);
+auto transactionId = parseTransactionId(url);
+if (IsNullOrEmpty(transactionId))
+  return nullptr;
+transaction->setTransactionId(transactionId);
+std::shared_ptr client;
+if (transaction->getDirection() == SEND) {
+  client = openConnectionForSending(transaction);
+} else {
+  client = openConnectionForReceive(transaction);
+  transaction->setDataAvailable(true);
+  // a 201 tells us that data is available. A 200 would mean that 
nothing is available.
+}
+
+client->appendHeader(PROTOCOL_VERSION_HEADER, "1");
+peer_->setStream(std::unique_ptr(new 
io::HttpStream(client)));
+transactionID = transaction->getUUIDStr();
+logger_->log_debug("Created transaction id -%s-", transactionID);
+known_transactions_[transaction->getUUIDStr()] = transaction;
+return transaction;
+  }
+} else {
+  logger_->log_debug("Could not create transaction, intent is %s", 
intent_name);
+}
+  } else {
+logger_->log_debug("Could not create transaction, received %d", 
client->getResponseCode());
+  }
+  return nullptr;
+}
+
+int HttpSiteToSi

[GitHub] nifi-minifi-cpp pull request #158: MINIFICPP-60: Add initial implementation ...

2017-10-30 Thread minifirocks
Github user minifirocks commented on a diff in the pull request:

https://github.com/apache/nifi-minifi-cpp/pull/158#discussion_r147837310
  
--- Diff: extensions/http-curl/sitetosite/HTTPProtocol.cpp ---
@@ -0,0 +1,312 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "HTTPProtocol.h"
+
+#include 
+#include 
+#include 
+#include 
+#include 
+#include 
+#include 
+#include 
+#include 
+#include 
+#include 
+
+#include "PeersEntity.h"
+#include "io/CRCStream.h"
+#include "sitetosite/Peer.h"
+#include "io/validation.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace sitetosite {
+
+std::shared_ptr HttpSiteToSiteClient::id_generator_ = 
utils::IdGenerator::getIdGenerator();
+
+const std::string HttpSiteToSiteClient::parseTransactionId(const 
std::string ) {
+  int i = 0;
+  for (i = uri.length() - 1; i >= 0; i--) {
+if (uri.at(i) == '/')
+  break;
+  }
+  return uri.substr(i + 1, uri.length() - (i + 1));
+}
+
+std::shared_ptr 
HttpSiteToSiteClient::createTransaction(std::string , 
TransferDirection direction) {
+  std::string dir_str = direction == SEND ? "input-ports" : "output-ports";
+  std::stringstream uri;
+  uri << getBaseURI() << "data-transfer/" << dir_str << "/" << getPortId() 
<< "/transactions";
+  auto client = create_http_client(uri.str(), "POST");
+
+  client->appendHeader(PROTOCOL_VERSION_HEADER, "1");
+
+  client->setConnectionTimeout(5);
+
+  client->setContentType("application/json");
+  client->appendHeader("Accept: application/json");
+  client->setUseChunkedEncoding();
+  client->setPostFields("");
+  client->submit();
+  peer_->setStream(nullptr);
+  if (client->getResponseCode() == 201) {
+// parse the headers
+auto headers = client->getParsedHeaders();
+auto intent_name = headers["x-location-uri-intent"];
+if (intent_name == "transaction-url") {
+  auto url = headers["Location"];
+
+  if (IsNullOrEmpty()) {
+logger_->log_debug("Location is empty");
+  } else {
+
+org::apache::nifi::minifi::io::CRCStream 
crcstream(peer_.get());
+auto transaction = std::make_shared(direction, 
crcstream);
+transaction->initialize(this, url);
+auto transactionId = parseTransactionId(url);
+if (IsNullOrEmpty(transactionId))
+  return nullptr;
+transaction->setTransactionId(transactionId);
+std::shared_ptr client;
+if (transaction->getDirection() == SEND) {
+  client = openConnectionForSending(transaction);
+} else {
+  client = openConnectionForReceive(transaction);
+  transaction->setDataAvailable(true);
+  // a 201 tells us that data is available. A 200 would mean that 
nothing is available.
+}
+
+client->appendHeader(PROTOCOL_VERSION_HEADER, "1");
+peer_->setStream(std::unique_ptr(new 
io::HttpStream(client)));
+transactionID = transaction->getUUIDStr();
+logger_->log_debug("Created transaction id -%s-", transactionID);
+known_transactions_[transaction->getUUIDStr()] = transaction;
+return transaction;
+  }
+} else {
+  logger_->log_debug("Could not create transaction, intent is %s", 
intent_name);
+}
+  } else {
+logger_->log_debug("Could not create transaction, received %d", 
client->getResponseCode());
+  }
+  return nullptr;
+}
+
+int HttpSiteToSi

[GitHub] nifi-minifi-cpp pull request #158: MINIFICPP-60: Add initial implementation ...

2017-10-30 Thread minifirocks
Github user minifirocks commented on a diff in the pull request:

https://github.com/apache/nifi-minifi-cpp/pull/158#discussion_r147837119
  
--- Diff: extensions/http-curl/sitetosite/HTTPProtocol.cpp ---
@@ -0,0 +1,312 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "HTTPProtocol.h"
+
+#include 
+#include 
+#include 
+#include 
+#include 
+#include 
+#include 
+#include 
+#include 
+#include 
+#include 
+
+#include "PeersEntity.h"
+#include "io/CRCStream.h"
+#include "sitetosite/Peer.h"
+#include "io/validation.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace sitetosite {
+
+std::shared_ptr HttpSiteToSiteClient::id_generator_ = 
utils::IdGenerator::getIdGenerator();
+
+const std::string HttpSiteToSiteClient::parseTransactionId(const 
std::string ) {
+  int i = 0;
+  for (i = uri.length() - 1; i >= 0; i--) {
+if (uri.at(i) == '/')
+  break;
+  }
+  return uri.substr(i + 1, uri.length() - (i + 1));
+}
+
+std::shared_ptr 
HttpSiteToSiteClient::createTransaction(std::string , 
TransferDirection direction) {
+  std::string dir_str = direction == SEND ? "input-ports" : "output-ports";
+  std::stringstream uri;
+  uri << getBaseURI() << "data-transfer/" << dir_str << "/" << getPortId() 
<< "/transactions";
+  auto client = create_http_client(uri.str(), "POST");
+
+  client->appendHeader(PROTOCOL_VERSION_HEADER, "1");
+
+  client->setConnectionTimeout(5);
+
+  client->setContentType("application/json");
+  client->appendHeader("Accept: application/json");
+  client->setUseChunkedEncoding();
+  client->setPostFields("");
+  client->submit();
+  peer_->setStream(nullptr);
+  if (client->getResponseCode() == 201) {
+// parse the headers
+auto headers = client->getParsedHeaders();
+auto intent_name = headers["x-location-uri-intent"];
+if (intent_name == "transaction-url") {
+  auto url = headers["Location"];
+
+  if (IsNullOrEmpty()) {
+logger_->log_debug("Location is empty");
+  } else {
+
+org::apache::nifi::minifi::io::CRCStream 
crcstream(peer_.get());
+auto transaction = std::make_shared(direction, 
crcstream);
+transaction->initialize(this, url);
+auto transactionId = parseTransactionId(url);
+if (IsNullOrEmpty(transactionId))
+  return nullptr;
+transaction->setTransactionId(transactionId);
+std::shared_ptr client;
+if (transaction->getDirection() == SEND) {
+  client = openConnectionForSending(transaction);
+} else {
+  client = openConnectionForReceive(transaction);
+  transaction->setDataAvailable(true);
+  // a 201 tells us that data is available. A 200 would mean that 
nothing is available.
+}
+
+client->appendHeader(PROTOCOL_VERSION_HEADER, "1");
+peer_->setStream(std::unique_ptr(new 
io::HttpStream(client)));
+transactionID = transaction->getUUIDStr();
+logger_->log_debug("Created transaction id -%s-", transactionID);
+known_transactions_[transaction->getUUIDStr()] = transaction;
+return transaction;
+  }
+} else {
+  logger_->log_debug("Could not create transaction, intent is %s", 
intent_name);
+}
+  } else {
+logger_->log_debug("Could not create transaction, received %d", 
client->getResponseCode());
+  }
+  return nullptr;
+}
+
+int HttpSiteToSi

[GitHub] nifi-minifi-cpp pull request #158: MINIFICPP-60: Add initial implementation ...

2017-10-30 Thread minifirocks
Github user minifirocks commented on a diff in the pull request:

https://github.com/apache/nifi-minifi-cpp/pull/158#discussion_r147747069
  
--- Diff: extensions/http-curl/client/HTTPCallback.h ---
@@ -0,0 +1,187 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef EXTENSIONS_HTTP_CURL_CLIENT_HTTPCALLBACK_H_
+#define EXTENSIONS_HTTP_CURL_CLIENT_HTTPCALLBACK_H_
+
+#include "concurrentqueue.h"
+#include 
+#include 
+#include 
+#include 
+
+#include "utils/ByteArrayCallback.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace utils {
+
+/**
+ * will stream as items are processed.
+ */
+class HttpStreamingCallback : public ByteInputCallBack {
+ public:
+  HttpStreamingCallback()
+  : ptr(nullptr),
+is_alive_(true) {
+previous_pos_ = 0;
+rolling_count_ = 0;
+  }
+
+  virtual ~HttpStreamingCallback() {
+
+  }
+
+  void close() {
+is_alive_ = false;
+cv.notify_all();
+  }
+
+  virtual void seek(size_t pos) {
+if ((pos - previous_pos_) >= current_vec_.size() || 
current_vec_.size() == 0)
+  load_buffer();
+  }
+
+  virtual int64_t process(std::shared_ptr stream) {
+
+std::vector vec;
+
+if (stream->getSize() > 0) {
+  vec.resize(stream->getSize());
+
+  stream->readData(reinterpret_cast<uint8_t*>(vec.data()), 
stream->getSize());
+}
+
+size_t added_size = vec.size();
+
+byte_arrays_.enqueue(std::move(vec));
+
+cv.notify_all();
+
+return added_size;
+
+  }
+
+  virtual int64_t process(uint8_t *vector, size_t size) {
+
+std::vector vec;
+
+if (size > 0) {
+  vec.resize(size);
+
+  memcpy(vec.data(), vector, size);
+
+  size_t added_size = vec.size();
+
+  byte_arrays_.enqueue(std::move(vec));
+
+  cv.notify_all();
+
+  return added_size;
+} else {
+  return 0;
+}
+
+  }
+
+  virtual void write(std::string content) {
+std::vector vec;
+vec.assign(content.begin(), content.end());
+byte_arrays_.enqueue(vec);
+  }
+
+  virtual char *getBuffer(size_t pos) {
+
+// if there is no space remaining in our current buffer,
+// we should load the next. If none exists after that we have no more 
buffer
+std::lock_guard lock(mutex_);
+
+if ((pos - previous_pos_) >= current_vec_.size() || 
current_vec_.size() == 0)
+  load_buffer();
+
+if (ptr == nullptr)
+  return nullptr;
+
+size_t absolute_position = pos - previous_pos_;
+
+current_pos_ = pos;
+for (int i = 0; i < current_vec_.size(); i++) {
+}
+
+return ptr + absolute_position;
+  }
+
+  virtual const size_t getRemaining(size_t pos) {
+return current_vec_.size();
+  }
+
+  virtual const size_t getBufferSize() {
+std::lock_guard lock(mutex_);
+
+if (ptr == nullptr || current_pos_ >= rolling_count_) {
+  load_buffer();
+}
+return rolling_count_;
+  }
+
+ private:
+
+  inline void load_buffer() {
+std::unique_lock lock(mutex_);
+cv.wait(lock, [&] {return byte_arrays_.size_approx() > 0 || 
is_alive_==false;});
+if (!is_alive_ && byte_arrays_.size_approx() == 0) {
+  return;
--- End diff --

i thought after wait fired, we own the lock, that's reason you did unlock 
afterwards.


---


[GitHub] nifi-minifi-cpp pull request #158: MINIFICPP-60: Add initial implementation ...

2017-10-30 Thread minifirocks
Github user minifirocks commented on a diff in the pull request:

https://github.com/apache/nifi-minifi-cpp/pull/158#discussion_r147739186
  
--- Diff: extensions/http-curl/client/HTTPCallback.h ---
@@ -0,0 +1,187 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef EXTENSIONS_HTTP_CURL_CLIENT_HTTPCALLBACK_H_
+#define EXTENSIONS_HTTP_CURL_CLIENT_HTTPCALLBACK_H_
+
+#include "concurrentqueue.h"
+#include 
+#include 
+#include 
+#include 
+
+#include "utils/ByteArrayCallback.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace utils {
+
+/**
+ * will stream as items are processed.
+ */
+class HttpStreamingCallback : public ByteInputCallBack {
+ public:
+  HttpStreamingCallback()
+  : ptr(nullptr),
+is_alive_(true) {
+previous_pos_ = 0;
+rolling_count_ = 0;
+  }
+
+  virtual ~HttpStreamingCallback() {
+
+  }
+
+  void close() {
+is_alive_ = false;
+cv.notify_all();
+  }
+
+  virtual void seek(size_t pos) {
+if ((pos - previous_pos_) >= current_vec_.size() || 
current_vec_.size() == 0)
+  load_buffer();
+  }
+
+  virtual int64_t process(std::shared_ptr stream) {
+
+std::vector vec;
+
+if (stream->getSize() > 0) {
+  vec.resize(stream->getSize());
+
+  stream->readData(reinterpret_cast<uint8_t*>(vec.data()), 
stream->getSize());
+}
+
+size_t added_size = vec.size();
+
+byte_arrays_.enqueue(std::move(vec));
+
+cv.notify_all();
+
+return added_size;
+
+  }
+
+  virtual int64_t process(uint8_t *vector, size_t size) {
+
+std::vector vec;
+
+if (size > 0) {
+  vec.resize(size);
+
+  memcpy(vec.data(), vector, size);
+
+  size_t added_size = vec.size();
+
+  byte_arrays_.enqueue(std::move(vec));
+
+  cv.notify_all();
+
+  return added_size;
+} else {
+  return 0;
+}
+
+  }
+
+  virtual void write(std::string content) {
+std::vector vec;
+vec.assign(content.begin(), content.end());
+byte_arrays_.enqueue(vec);
+  }
+
+  virtual char *getBuffer(size_t pos) {
+
+// if there is no space remaining in our current buffer,
+// we should load the next. If none exists after that we have no more 
buffer
+std::lock_guard lock(mutex_);
+
+if ((pos - previous_pos_) >= current_vec_.size() || 
current_vec_.size() == 0)
+  load_buffer();
+
+if (ptr == nullptr)
+  return nullptr;
+
+size_t absolute_position = pos - previous_pos_;
+
+current_pos_ = pos;
+for (int i = 0; i < current_vec_.size(); i++) {
+}
+
+return ptr + absolute_position;
+  }
+
+  virtual const size_t getRemaining(size_t pos) {
+return current_vec_.size();
+  }
+
+  virtual const size_t getBufferSize() {
+std::lock_guard lock(mutex_);
+
+if (ptr == nullptr || current_pos_ >= rolling_count_) {
+  load_buffer();
+}
+return rolling_count_;
+  }
+
+ private:
+
+  inline void load_buffer() {
+std::unique_lock lock(mutex_);
+cv.wait(lock, [&] {return byte_arrays_.size_approx() > 0 || 
is_alive_==false;});
+if (!is_alive_ && byte_arrays_.size_approx() == 0) {
+  return;
--- End diff --

unlock the mutex before return?


---


[GitHub] nifi-minifi-cpp pull request #158: MINIFICPP-60: Add initial implementation ...

2017-10-30 Thread minifirocks
Github user minifirocks commented on a diff in the pull request:

https://github.com/apache/nifi-minifi-cpp/pull/158#discussion_r147738287
  
--- Diff: extensions/http-curl/client/HTTPCallback.h ---
@@ -0,0 +1,187 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef EXTENSIONS_HTTP_CURL_CLIENT_HTTPCALLBACK_H_
+#define EXTENSIONS_HTTP_CURL_CLIENT_HTTPCALLBACK_H_
+
+#include "concurrentqueue.h"
+#include 
+#include 
+#include 
+#include 
+
+#include "utils/ByteArrayCallback.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace utils {
+
+/**
+ * will stream as items are processed.
+ */
+class HttpStreamingCallback : public ByteInputCallBack {
+ public:
+  HttpStreamingCallback()
+  : ptr(nullptr),
+is_alive_(true) {
+previous_pos_ = 0;
+rolling_count_ = 0;
+  }
+
+  virtual ~HttpStreamingCallback() {
+
+  }
+
+  void close() {
+is_alive_ = false;
+cv.notify_all();
+  }
+
+  virtual void seek(size_t pos) {
+if ((pos - previous_pos_) >= current_vec_.size() || 
current_vec_.size() == 0)
+  load_buffer();
+  }
+
+  virtual int64_t process(std::shared_ptr stream) {
+
+std::vector vec;
+
+if (stream->getSize() > 0) {
+  vec.resize(stream->getSize());
+
+  stream->readData(reinterpret_cast<uint8_t*>(vec.data()), 
stream->getSize());
+}
+
+size_t added_size = vec.size();
+
+byte_arrays_.enqueue(std::move(vec));
+
+cv.notify_all();
+
+return added_size;
+
+  }
+
+  virtual int64_t process(uint8_t *vector, size_t size) {
+
+std::vector vec;
+
+if (size > 0) {
+  vec.resize(size);
+
+  memcpy(vec.data(), vector, size);
+
+  size_t added_size = vec.size();
+
+  byte_arrays_.enqueue(std::move(vec));
+
+  cv.notify_all();
+
+  return added_size;
+} else {
+  return 0;
+}
+
+  }
+
+  virtual void write(std::string content) {
+std::vector vec;
+vec.assign(content.begin(), content.end());
+byte_arrays_.enqueue(vec);
--- End diff --

same as above


---


[GitHub] nifi-minifi-cpp pull request #158: MINIFICPP-60: Add initial implementation ...

2017-10-30 Thread minifirocks
Github user minifirocks commented on a diff in the pull request:

https://github.com/apache/nifi-minifi-cpp/pull/158#discussion_r147738083
  
--- Diff: extensions/http-curl/client/HTTPCallback.h ---
@@ -0,0 +1,187 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef EXTENSIONS_HTTP_CURL_CLIENT_HTTPCALLBACK_H_
+#define EXTENSIONS_HTTP_CURL_CLIENT_HTTPCALLBACK_H_
+
+#include "concurrentqueue.h"
+#include 
+#include 
+#include 
+#include 
+
+#include "utils/ByteArrayCallback.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace utils {
+
+/**
+ * will stream as items are processed.
+ */
+class HttpStreamingCallback : public ByteInputCallBack {
+ public:
+  HttpStreamingCallback()
+  : ptr(nullptr),
+is_alive_(true) {
+previous_pos_ = 0;
+rolling_count_ = 0;
+  }
+
+  virtual ~HttpStreamingCallback() {
+
+  }
+
+  void close() {
+is_alive_ = false;
+cv.notify_all();
+  }
+
+  virtual void seek(size_t pos) {
+if ((pos - previous_pos_) >= current_vec_.size() || 
current_vec_.size() == 0)
+  load_buffer();
+  }
+
+  virtual int64_t process(std::shared_ptr stream) {
+
+std::vector vec;
+
+if (stream->getSize() > 0) {
+  vec.resize(stream->getSize());
+
+  stream->readData(reinterpret_cast<uint8_t*>(vec.data()), 
stream->getSize());
+}
+
+size_t added_size = vec.size();
+
+byte_arrays_.enqueue(std::move(vec));
+
+cv.notify_all();
+
+return added_size;
+
+  }
+
+  virtual int64_t process(uint8_t *vector, size_t size) {
+
+std::vector vec;
+
+if (size > 0) {
+  vec.resize(size);
+
+  memcpy(vec.data(), vector, size);
+
+  size_t added_size = vec.size();
+
+  byte_arrays_.enqueue(std::move(vec));
--- End diff --

do we need to lock the mutex_ before enqueue?


---


[GitHub] nifi-minifi-cpp issue #151: MINIFICPP-264: CompressContent Processor

2017-10-25 Thread minifirocks
Github user minifirocks commented on the issue:

https://github.com/apache/nifi-minifi-cpp/pull/151
  
refactor to use lib archive extension.


---


[GitHub] nifi-minifi-cpp issue #154: MINIFICPP-265 Disabled fsanitize=address for now...

2017-10-25 Thread minifirocks
Github user minifirocks commented on the issue:

https://github.com/apache/nifi-minifi-cpp/pull/154
  
looks good.


---


[GitHub] nifi-minifi-cpp issue #151: MINIFICPP-264: CompressContent Processor

2017-10-25 Thread minifirocks
Github user minifirocks commented on the issue:

https://github.com/apache/nifi-minifi-cpp/pull/151
  
@apiri @phrocker fix the test cast, change size and offset signature to 
uint6_t, it looks like travis passed. please review


---


[GitHub] nifi-minifi-cpp issue #151: MINIFICPP-264: CompressContent Processor

2017-10-24 Thread minifirocks
Github user minifirocks commented on the issue:

https://github.com/apache/nifi-minifi-cpp/pull/151
  
@phrocker the basic class is io::BaseStream, the signature is virtual void 
seek(uint32_t offset) {
the uint64_t break the virtual inheritance.
CompressContext test is running on in my local, it fail in travis, will 
debug more


---


[GitHub] nifi-minifi-cpp issue #151: MINIFICPP-264: CompressContent Processor

2017-10-23 Thread minifirocks
Github user minifirocks commented on the issue:

https://github.com/apache/nifi-minifi-cpp/pull/151
  
implement the compress content, tested with random 100K payload with format 
gzip, bzip, lzma, xy-lzma. fix the seek issue in stream.


---


[GitHub] nifi-minifi-cpp pull request #151: MINIFICPP-264: CompressContent Processor

2017-10-23 Thread minifirocks
GitHub user minifirocks opened a pull request:

https://github.com/apache/nifi-minifi-cpp/pull/151

MINIFICPP-264: CompressContent Processor

Thank you for submitting a contribution to Apache NiFi - MiNiFi C++.

In order to streamline the review of the contribution we ask you
to ensure the following steps have been taken:

### For all changes:
- [ ] Is there a JIRA ticket associated with this PR? Is it referenced
 in the commit message?

- [ ] Does your PR title start with MINIFI- where  is the JIRA 
number you are trying to resolve? Pay particular attention to the hyphen "-" 
character.

- [ ] Has your PR been rebased against the latest commit within the target 
branch (typically master)?

- [ ] Is your initial contribution a single, squashed commit?

### For code changes:
- [ ] If adding new dependencies to the code, are these dependencies 
licensed in a way that is compatible for inclusion under [ASF 
2.0](http://www.apache.org/legal/resolved.html#category-a)?
- [ ] If applicable, have you updated the LICENSE file?
- [ ] If applicable, have you updated the NOTICE file?

### For documentation related changes:
- [ ] Have you ensured that format looks appropriate for the output in 
which it is rendered?

### Note:
Please ensure that once the PR is submitted, you check travis-ci for build 
issues and submit an update to your PR as soon as possible.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/minifirocks/nifi-minifi-cpp compress_content

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/nifi-minifi-cpp/pull/151.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #151


commit a2617f78d26e502112106e0fe92395d41c1c303a
Author: Bin Qiu <benqiu2...@gmail.com>
Date:   2017-10-24T03:17:01Z

MINIFICPP-264: CompressContent Processor




---


[GitHub] nifi-minifi-cpp pull request #148: MINIFI-244 Un/FocusArchive processors

2017-10-21 Thread minifirocks
Github user minifirocks commented on a diff in the pull request:

https://github.com/apache/nifi-minifi-cpp/pull/148#discussion_r146120896
  
--- Diff: libminifi/include/core/ProcessSession.h ---
@@ -151,11 +152,47 @@ class ProcessSession {
   bool keepSource,
   uint64_t offset, char inputDelimiter);
 
+  /**
+   * Exports the data stream to a file
+   * @param string file to export stream to
+   * @param flow flow file
+   * @param bool whether or not to keep the content in the flow file
+   */
+  bool exportContent(const std::string ,
--- End diff --

the mergeContent.h
// Archive Class
class ArchiveMerge {
public: 
do not reply on persistent storage, it use archive_write_open(arch, this, 
NULL, archive_write, NULL); to write the process content in RAM into flowfile.


---


[GitHub] nifi-minifi-cpp pull request #146: MINIFICPP-72: Add Tar and Zip Support for...

2017-10-20 Thread minifirocks
Github user minifirocks closed the pull request at:

https://github.com/apache/nifi-minifi-cpp/pull/146


---


[GitHub] nifi-minifi-cpp issue #150: MINIFI-372: Resolve issues with missed commits

2017-10-20 Thread minifirocks
Github user minifirocks commented on the issue:

https://github.com/apache/nifi-minifi-cpp/pull/150
  
@phrocker please merge these two commits into one


---


[GitHub] nifi-minifi-cpp issue #150: MINIFI-372: Resolve issues with missed commits

2017-10-20 Thread minifirocks
Github user minifirocks commented on the issue:

https://github.com/apache/nifi-minifi-cpp/pull/150
  
@phrocker looks good.


---


[GitHub] nifi-minifi-cpp issue #146: MINIFICPP-72: Add Tar and Zip Support for MergeC...

2017-10-17 Thread minifirocks
Github user minifirocks commented on the issue:

https://github.com/apache/nifi-minifi-cpp/pull/146
  
@apiri @phrocker rebased.



---


[GitHub] nifi-minifi-cpp issue #147: MINIFI-256: Resolve Putfile name and ensure that...

2017-10-17 Thread minifirocks
Github user minifirocks commented on the issue:

https://github.com/apache/nifi-minifi-cpp/pull/147
  
@phrocker merge to apache, please close the PR.


---


[GitHub] nifi-minifi-cpp issue #147: MINIFI-256: Resolve Putfile name and ensure that...

2017-10-17 Thread minifirocks
Github user minifirocks commented on the issue:

https://github.com/apache/nifi-minifi-cpp/pull/147
  

https://github.com/apache/nifi-minifi-cpp/commit/49ed5094552fe98c289d15168587ff3e63042309


---


[GitHub] nifi-minifi-cpp issue #147: MINIFI-256: Resolve Putfile name and ensure that...

2017-10-17 Thread minifirocks
Github user minifirocks commented on the issue:

https://github.com/apache/nifi-minifi-cpp/pull/147
  
@phrocker looks good.


---


[GitHub] nifi-minifi-cpp issue #142: MINIFI-372: Replace leveldb with RocksDB

2017-10-16 Thread minifirocks
Github user minifirocks commented on the issue:

https://github.com/apache/nifi-minifi-cpp/pull/142
  
@phrocker overall looks good, please remove the test and doc and unneeded 
stuff from the rocksdb source.



---


[GitHub] nifi-minifi-cpp pull request #146: MINIFICPP-72: Add Tar and Zip Support for...

2017-10-16 Thread minifirocks
Github user minifirocks commented on a diff in the pull request:

https://github.com/apache/nifi-minifi-cpp/pull/146#discussion_r144901439
  
--- Diff: LICENSE ---
@@ -564,4 +564,68 @@ ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, 
OUT OF OR IN
 CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
--- End diff --

@apiri fix the license


---


[GitHub] nifi-minifi-cpp pull request #146: MINIFICPP-72: Add Tar and Zip Support for...

2017-10-16 Thread minifirocks
Github user minifirocks commented on a diff in the pull request:

https://github.com/apache/nifi-minifi-cpp/pull/146#discussion_r144900901
  
--- Diff: libminifi/include/processors/MergeContent.h ---
@@ -125,6 +127,127 @@ class BinaryConcatenationMerge : public MergeBin {
 };
 
 
+// Archive Class
+class ArchiveMerge {
+public:
+  // Nest Callback Class for read stream
+  class ReadCallback: public InputStreamCallback {
+  public:
+ReadCallback(uint64_t size, struct archive *arch, struct archive_entry 
*entry) :
+buffer_size_(size), arch_(arch), entry_(entry) {
+}
+~ReadCallback() {
+}
+int64_t process(std::shared_ptr stream) {
+  uint8_t buffer[buffer_size_];
+  int64_t ret = 0;
+  uint64_t read_size;
+  ret = stream->read(buffer, buffer_size_);
+  if (!stream)
+read_size = stream->getSize();
+  else
+read_size = buffer_size_;
+  ret = archive_write_header(arch_, entry_);
+  ret += archive_write_data(arch_, buffer, read_size);
+  return ret;
+}
+uint64_t buffer_size_;
+struct archive *arch_;
+struct archive_entry *entry_;
+  };
+  // Nest Callback Class for write stream
+  class WriteCallback: public OutputStreamCallback {
+  public:
+WriteCallback(std::string merge_type, 
std::deque<std::shared_ptr> , core::ProcessSession 
*session) :
+merge_type_(merge_type), flows_(flows), session_(session) {
+  size_ = 0;
+  stream_ = nullptr;
+}
+~WriteCallback() {
+}
+
+std::string merge_type_;
+std::deque<std::shared_ptr> _;
+core::ProcessSession *session_;
+std::shared_ptr stream_;
+int64_t size_;
+
+static la_ssize_t archive_write(struct archive *arch, void *context, 
const void *buff, size_t size) {
+  WriteCallback *callback = (WriteCallback *) context;
+  la_ssize_t ret = 
callback->stream_->write(reinterpret_cast<uint8_t*>(const_cast<void*>(buff)), 
size);
+  if (ret > 0)
+callback->size_ += (int64_t) ret;
+  return ret;
+}
+
+int64_t process(std::shared_ptr stream) {
+  int64_t ret = 0;
+  struct archive *arch;
+
+  arch = archive_write_new();
+  if (merge_type_ == MERGE_FORMAT_TAR_VALUE) {
+archive_write_set_format_pax_restricted(arch); // tar format
+  }
+  if (merge_type_ == MERGE_FORMAT_ZIP_VALUE) {
+archive_write_set_format_zip(arch); // zip format
+  }
+  archive_write_set_bytes_per_block(arch, 0);
+  archive_write_add_filter_none(arch);
+  this->stream_ = stream;
+  archive_write_open(arch, this, NULL, archive_write, NULL);
+
+  for (auto flow : flows_) {
+struct archive_entry *entry = archive_entry_new();
+std::string fileName;
+flow->getAttribute(FlowAttributeKey(FILENAME), fileName);
+archive_entry_set_pathname(entry, fileName.c_str());
+archive_entry_set_size(entry, flow->getSize());
+archive_entry_set_mode(entry, S_IFREG | 0755);
+if (merge_type_ == MERGE_FORMAT_TAR_VALUE) {
+  std::string perm;
+  int permInt;
+  if (flow->getAttribute(BinFiles::TAR_PERMISSIONS_ATTRIBUTE, 
perm)) {
+try {
+  permInt = std::stoi(perm);
+  archive_entry_set_perm(entry, (mode_t) permInt);
+} catch (...) {
--- End diff --

@apiri @phrocker fixed.


---


[GitHub] nifi-minifi-cpp issue #146: MINIFICPP-72: Add Tar and Zip Support for MergeC...

2017-10-12 Thread minifirocks
Github user minifirocks commented on the issue:

https://github.com/apache/nifi-minifi-cpp/pull/146
  
@apiri remove test and doc from lib archieve, update LICENSE to add each 
author.


---


[GitHub] nifi-minifi-cpp issue #146: Archive merge

2017-10-12 Thread minifirocks
Github user minifirocks commented on the issue:

https://github.com/apache/nifi-minifi-cpp/pull/146
  
The incremental read already been added to address Marc comments.


---


[GitHub] nifi-minifi-cpp issue #146: Archive merge

2017-10-11 Thread minifirocks
Github user minifirocks commented on the issue:

https://github.com/apache/nifi-minifi-cpp/pull/146
  
@phrocker @apiri please let me know if you have comments.


---


[GitHub] nifi-minifi-cpp pull request #146: Archive merge

2017-10-11 Thread minifirocks
Github user minifirocks commented on a diff in the pull request:

https://github.com/apache/nifi-minifi-cpp/pull/146#discussion_r144026084
  
--- Diff: extensions/http-curl/CMakeLists.txt ---
@@ -24,7 +24,7 @@ find_package(CURL REQUIRED)
 set(CMAKE_EXE_LINKER_FLAGS "-Wl,--export-all-symbols")
 set(CMAKE_SHARED_LINKER_FLAGS "-Wl,--export-symbols")
 
-include_directories(../../libminifi/include ../../libminifi/include/c2  
../../libminifi/include/c2/protocols/  ../../libminifi/include/core/state 
./libminifi/include/core/statemanagement/metrics  
../../libminifi/include/core/yaml  ../../libminifi/include/core  
../../thirdparty/spdlog-20170710/include ../../thirdparty/concurrentqueue 
../../thirdparty/yaml-cpp-yaml-cpp-0.5.3/include 
../../thirdparty/civetweb-1.9.1/include ../../thirdparty/jsoncpp/include 
../../thirdparty/leveldb-1.18/include ../../thirdparty/)
+include_directories(../../libminifi/include ../../libminifi/include/c2  
../../libminifi/include/c2/protocols/  ../../libminifi/include/core/state 
./libminifi/include/core/statemanagement/metrics  
../../libminifi/include/core/yaml  ../../libminifi/include/core  
../../thirdparty/spdlog-20170710/include ../../thirdparty/concurrentqueue 
../../thirdparty/yaml-cpp-yaml-cpp-0.5.3/include 
../../thirdparty/civetweb-1.9.1/include ../../thirdparty/jsoncpp/include 
../../thirdparty/leveldb-1.18/include 
../../thirdparty/libarchive-3.3.2/libarchive ../../thirdparty/)
--- End diff --

it is FlowConfiguration.h which include MergeContent.h



---


[GitHub] nifi-minifi-cpp pull request #146: Archive merge

2017-10-10 Thread minifirocks
Github user minifirocks commented on a diff in the pull request:

https://github.com/apache/nifi-minifi-cpp/pull/146#discussion_r143912126
  
--- Diff: libminifi/src/processors/MergeContent.cpp ---
@@ -276,6 +287,46 @@ std::shared_ptr 
BinaryConcatenationMerge::merge(core::ProcessCon
   return flowFile;
 }
 
+std::shared_ptr TarMerge::merge(core::ProcessContext 
*context, core::ProcessSession *session, 
std::deque<std::shared_ptr> , std::string ,
+std::string , std::string ) {
+  std::shared_ptr flowFile = std::static_pointer_cast < 
FlowFileRecord > (session->create());
+  ArchiveMerge::WriteCallback 
callback(std::string(MERGE_FORMAT_TAR_VALUE), flows, session);
+  session->write(flowFile, );
+  session->putAttribute(flowFile, FlowAttributeKey(MIME_TYPE), 
this->getMergedContentType());
+  std::string fileName;
+  flowFile->getAttribute(FlowAttributeKey(FILENAME), fileName);
+  if (flows.size() == 1) {
+flows.front()->getAttribute(FlowAttributeKey(FILENAME), fileName);
+  } else {
+flows.front()->getAttribute(BinFiles::SEGMENT_ORIGINAL_FILENAME, 
fileName);
+  }
+  if (!fileName.empty()) {
+fileName += ".tar";
--- End diff --

same as blew zip. we will tar the tar file into a tar ball.


---


[GitHub] nifi-minifi-cpp pull request #146: Archive merge

2017-10-10 Thread minifirocks
Github user minifirocks commented on a diff in the pull request:

https://github.com/apache/nifi-minifi-cpp/pull/146#discussion_r143912068
  
--- Diff: extensions/http-curl/CMakeLists.txt ---
@@ -24,7 +24,7 @@ find_package(CURL REQUIRED)
 set(CMAKE_EXE_LINKER_FLAGS "-Wl,--export-all-symbols")
 set(CMAKE_SHARED_LINKER_FLAGS "-Wl,--export-symbols")
 
-include_directories(../../libminifi/include ../../libminifi/include/c2  
../../libminifi/include/c2/protocols/  ../../libminifi/include/core/state 
./libminifi/include/core/statemanagement/metrics  
../../libminifi/include/core/yaml  ../../libminifi/include/core  
../../thirdparty/spdlog-20170710/include ../../thirdparty/concurrentqueue 
../../thirdparty/yaml-cpp-yaml-cpp-0.5.3/include 
../../thirdparty/civetweb-1.9.1/include ../../thirdparty/jsoncpp/include 
../../thirdparty/leveldb-1.18/include ../../thirdparty/)
+include_directories(../../libminifi/include ../../libminifi/include/c2  
../../libminifi/include/c2/protocols/  ../../libminifi/include/core/state 
./libminifi/include/core/statemanagement/metrics  
../../libminifi/include/core/yaml  ../../libminifi/include/core  
../../thirdparty/spdlog-20170710/include ../../thirdparty/concurrentqueue 
../../thirdparty/yaml-cpp-yaml-cpp-0.5.3/include 
../../thirdparty/civetweb-1.9.1/include ../../thirdparty/jsoncpp/include 
../../thirdparty/leveldb-1.18/include 
../../thirdparty/libarchive-3.3.2/libarchive ../../thirdparty/)
--- End diff --

Scanning dependencies of target minifi-http-curl
[ 15%] Building CXX object 
extensions/http-curl/CMakeFiles/minifi-http-curl.dir/HttpCurlLoader.cpp.o
In file included from 
/Users/binqiu/m/nifi-minifi-cpp/extensions/http-curl/HttpCurlLoader.cpp:20:
In file included from 
/Users/binqiu/m/nifi-minifi-cpp/extensions/http-curl/../../libminifi/include/core/FlowConfiguration.h:37:

/Users/binqiu/m/nifi-minifi-cpp/extensions/http-curl/../../libminifi/include/processors/MergeContent.h:24:10:
 fatal error: 'archive_entry.h' file not found


---


[GitHub] nifi-minifi-cpp issue #146: Archive merge

2017-10-10 Thread minifirocks
Github user minifirocks commented on the issue:

https://github.com/apache/nifi-minifi-cpp/pull/146
  
@phrocker please review the same
@apiri @jdye64 please review the LICENSE file for libarchive to see whether 
it is OK. Joe raise some concern in last PR.


---


[GitHub] nifi-minifi-cpp pull request #146: Archive merge

2017-10-09 Thread minifirocks
Github user minifirocks commented on a diff in the pull request:

https://github.com/apache/nifi-minifi-cpp/pull/146#discussion_r143552652
  
--- Diff: libminifi/src/processors/MergeContent.cpp ---
@@ -276,6 +287,46 @@ std::shared_ptr 
BinaryConcatenationMerge::merge(core::ProcessCon
   return flowFile;
 }
 
+std::shared_ptr TarMerge::merge(core::ProcessContext 
*context, core::ProcessSession *session, 
std::deque<std::shared_ptr> , std::string ,
+std::string , std::string ) {
+  std::shared_ptr flowFile = std::static_pointer_cast < 
FlowFileRecord > (session->create());
+  ArchiveMerge::WriteCallback 
callback(std::string(MERGE_FORMAT_TAR_VALUE), flows, session);
+  session->write(flowFile, );
+  session->putAttribute(flowFile, FlowAttributeKey(MIME_TYPE), 
this->getMergedContentType());
+  std::string fileName;
+  flowFile->getAttribute(FlowAttributeKey(FILENAME), fileName);
+  if (flows.size() == 1) {
+flows.front()->getAttribute(FlowAttributeKey(FILENAME), fileName);
+  } else {
+flows.front()->getAttribute(BinFiles::SEGMENT_ORIGINAL_FILENAME, 
fileName);
+  }
+  if (!fileName.empty()) {
+fileName += ".tar";
+session->putAttribute(flowFile, FlowAttributeKey(FILENAME), fileName);
+  }
+  return flowFile;
+}
+
+std::shared_ptr ZipMerge::merge(core::ProcessContext 
*context, core::ProcessSession *session, 
std::deque<std::shared_ptr> , std::string ,
+std::string , std::string ) {
+  std::shared_ptr flowFile = std::static_pointer_cast < 
FlowFileRecord > (session->create());
+  ArchiveMerge::WriteCallback 
callback(std::string(MERGE_FORMAT_ZIP_VALUE), flows, session);
+  session->write(flowFile, );
+  session->putAttribute(flowFile, FlowAttributeKey(MIME_TYPE), 
this->getMergedContentType());
+  std::string fileName;
+  flowFile->getAttribute(FlowAttributeKey(FILENAME), fileName);
+  if (flows.size() == 1) {
+flows.front()->getAttribute(FlowAttributeKey(FILENAME), fileName);
+  } else {
+flows.front()->getAttribute(BinFiles::SEGMENT_ORIGINAL_FILENAME, 
fileName);
+  }
+  if (!fileName.empty()) {
+fileName += ".zip";
--- End diff --

if two sep zip files fail into the same bin, we will create a single zip 
file which zip these two zip files.


---


[GitHub] nifi-minifi-cpp pull request #146: Archive merge

2017-10-09 Thread minifirocks
Github user minifirocks commented on a diff in the pull request:

https://github.com/apache/nifi-minifi-cpp/pull/146#discussion_r143551961
  
--- Diff: libminifi/src/processors/MergeContent.cpp ---
@@ -276,6 +287,46 @@ std::shared_ptr 
BinaryConcatenationMerge::merge(core::ProcessCon
   return flowFile;
 }
 
+std::shared_ptr TarMerge::merge(core::ProcessContext 
*context, core::ProcessSession *session, 
std::deque<std::shared_ptr> , std::string ,
+std::string , std::string ) {
+  std::shared_ptr flowFile = std::static_pointer_cast < 
FlowFileRecord > (session->create());
+  ArchiveMerge::WriteCallback 
callback(std::string(MERGE_FORMAT_TAR_VALUE), flows, session);
+  session->write(flowFile, );
+  session->putAttribute(flowFile, FlowAttributeKey(MIME_TYPE), 
this->getMergedContentType());
+  std::string fileName;
+  flowFile->getAttribute(FlowAttributeKey(FILENAME), fileName);
+  if (flows.size() == 1) {
+flows.front()->getAttribute(FlowAttributeKey(FILENAME), fileName);
+  } else {
+flows.front()->getAttribute(BinFiles::SEGMENT_ORIGINAL_FILENAME, 
fileName);
+  }
+  if (!fileName.empty()) {
+fileName += ".tar";
+session->putAttribute(flowFile, FlowAttributeKey(FILENAME), fileName);
+  }
+  return flowFile;
+}
+
+std::shared_ptr ZipMerge::merge(core::ProcessContext 
*context, core::ProcessSession *session, 
std::deque<std::shared_ptr> , std::string ,
+std::string , std::string ) {
+  std::shared_ptr flowFile = std::static_pointer_cast < 
FlowFileRecord > (session->create());
+  ArchiveMerge::WriteCallback 
callback(std::string(MERGE_FORMAT_ZIP_VALUE), flows, session);
+  session->write(flowFile, );
+  session->putAttribute(flowFile, FlowAttributeKey(MIME_TYPE), 
this->getMergedContentType());
+  std::string fileName;
+  flowFile->getAttribute(FlowAttributeKey(FILENAME), fileName);
+  if (flows.size() == 1) {
+flows.front()->getAttribute(FlowAttributeKey(FILENAME), fileName);
+  } else {
+flows.front()->getAttribute(BinFiles::SEGMENT_ORIGINAL_FILENAME, 
fileName);
+  }
+  if (!fileName.empty()) {
+fileName += ".zip";
--- End diff --

yes, we will zip the two separate zip into a larger one


---


[GitHub] nifi-minifi-cpp pull request #146: Archive merge

2017-10-09 Thread minifirocks
Github user minifirocks commented on a diff in the pull request:

https://github.com/apache/nifi-minifi-cpp/pull/146#discussion_r143551465
  
--- Diff: libminifi/include/processors/MergeContent.h ---
@@ -125,6 +127,127 @@ class BinaryConcatenationMerge : public MergeBin {
 };
 
 
+// Archive Class
+class ArchiveMerge {
+public:
+  // Nest Callback Class for read stream
+  class ReadCallback: public InputStreamCallback {
+  public:
+ReadCallback(uint64_t size, struct archive *arch, struct archive_entry 
*entry) :
+buffer_size_(size), arch_(arch), entry_(entry) {
+}
+~ReadCallback() {
+}
+int64_t process(std::shared_ptr stream) {
+  uint8_t buffer[buffer_size_];
+  int64_t ret = 0;
+  uint64_t read_size;
+  ret = stream->read(buffer, buffer_size_);
+  if (!stream)
+read_size = stream->getSize();
+  else
+read_size = buffer_size_;
+  ret = archive_write_header(arch_, entry_);
+  ret += archive_write_data(arch_, buffer, read_size);
+  return ret;
+}
+uint64_t buffer_size_;
+struct archive *arch_;
+struct archive_entry *entry_;
+  };
+  // Nest Callback Class for write stream
+  class WriteCallback: public OutputStreamCallback {
+  public:
+WriteCallback(std::string merge_type, 
std::deque<std::shared_ptr> , core::ProcessSession 
*session) :
+merge_type_(merge_type), flows_(flows), session_(session) {
+  size_ = 0;
+  stream_ = nullptr;
+}
+~WriteCallback() {
+}
+
+std::string merge_type_;
+std::deque<std::shared_ptr> _;
+core::ProcessSession *session_;
+std::shared_ptr stream_;
+int64_t size_;
+
+static la_ssize_t archive_write(struct archive *arch, void *context, 
const void *buff, size_t size) {
+  WriteCallback *callback = (WriteCallback *) context;
+  la_ssize_t ret = 
callback->stream_->write(reinterpret_cast<uint8_t*>(const_cast<void*>(buff)), 
size);
+  if (ret > 0)
+callback->size_ += (int64_t) ret;
+  return ret;
+}
+
+int64_t process(std::shared_ptr stream) {
+  int64_t ret = 0;
+  struct archive *arch;
+
+  arch = archive_write_new();
+  if (merge_type_ == MERGE_FORMAT_TAR_VALUE) {
+archive_write_set_format_pax_restricted(arch); // tar format
+  }
+  if (merge_type_ == MERGE_FORMAT_ZIP_VALUE) {
+archive_write_set_format_zip(arch); // zip format
+  }
+  archive_write_set_bytes_per_block(arch, 0);
+  archive_write_add_filter_none(arch);
+  this->stream_ = stream;
+  archive_write_open(arch, this, NULL, archive_write, NULL);
+
+  for (auto flow : flows_) {
+struct archive_entry *entry = archive_entry_new();
+std::string fileName;
+flow->getAttribute(FlowAttributeKey(FILENAME), fileName);
+archive_entry_set_pathname(entry, fileName.c_str());
+archive_entry_set_size(entry, flow->getSize());
+archive_entry_set_mode(entry, S_IFREG | 0755);
+if (merge_type_ == MERGE_FORMAT_TAR_VALUE) {
+  std::string perm;
+  int permInt;
+  if (flow->getAttribute(BinFiles::TAR_PERMISSIONS_ATTRIBUTE, 
perm)) {
+try {
+  permInt = std::stoi(perm);
+  archive_entry_set_perm(entry, (mode_t) permInt);
+} catch (...) {
+}
+  }
+}
+ReadCallback readCb(flow->getSize(), arch, entry);
--- End diff --

will try to do incremental read.


---


[GitHub] nifi-minifi-cpp pull request #146: Archive merge

2017-10-09 Thread minifirocks
Github user minifirocks commented on a diff in the pull request:

https://github.com/apache/nifi-minifi-cpp/pull/146#discussion_r143551088
  
--- Diff: libminifi/include/processors/MergeContent.h ---
@@ -125,6 +127,127 @@ class BinaryConcatenationMerge : public MergeBin {
 };
 
 
+// Archive Class
+class ArchiveMerge {
+public:
+  // Nest Callback Class for read stream
+  class ReadCallback: public InputStreamCallback {
+  public:
+ReadCallback(uint64_t size, struct archive *arch, struct archive_entry 
*entry) :
+buffer_size_(size), arch_(arch), entry_(entry) {
+}
+~ReadCallback() {
+}
+int64_t process(std::shared_ptr stream) {
+  uint8_t buffer[buffer_size_];
+  int64_t ret = 0;
+  uint64_t read_size;
+  ret = stream->read(buffer, buffer_size_);
+  if (!stream)
+read_size = stream->getSize();
+  else
+read_size = buffer_size_;
+  ret = archive_write_header(arch_, entry_);
+  ret += archive_write_data(arch_, buffer, read_size);
+  return ret;
+}
+uint64_t buffer_size_;
+struct archive *arch_;
+struct archive_entry *entry_;
+  };
+  // Nest Callback Class for write stream
+  class WriteCallback: public OutputStreamCallback {
+  public:
+WriteCallback(std::string merge_type, 
std::deque<std::shared_ptr> , core::ProcessSession 
*session) :
+merge_type_(merge_type), flows_(flows), session_(session) {
+  size_ = 0;
+  stream_ = nullptr;
+}
+~WriteCallback() {
+}
+
+std::string merge_type_;
+std::deque<std::shared_ptr> _;
+core::ProcessSession *session_;
+std::shared_ptr stream_;
+int64_t size_;
+
+static la_ssize_t archive_write(struct archive *arch, void *context, 
const void *buff, size_t size) {
+  WriteCallback *callback = (WriteCallback *) context;
+  la_ssize_t ret = 
callback->stream_->write(reinterpret_cast<uint8_t*>(const_cast<void*>(buff)), 
size);
+  if (ret > 0)
+callback->size_ += (int64_t) ret;
+  return ret;
+}
+
+int64_t process(std::shared_ptr stream) {
+  int64_t ret = 0;
+  struct archive *arch;
+
+  arch = archive_write_new();
+  if (merge_type_ == MERGE_FORMAT_TAR_VALUE) {
+archive_write_set_format_pax_restricted(arch); // tar format
+  }
+  if (merge_type_ == MERGE_FORMAT_ZIP_VALUE) {
+archive_write_set_format_zip(arch); // zip format
+  }
+  archive_write_set_bytes_per_block(arch, 0);
+  archive_write_add_filter_none(arch);
+  this->stream_ = stream;
+  archive_write_open(arch, this, NULL, archive_write, NULL);
+
+  for (auto flow : flows_) {
+struct archive_entry *entry = archive_entry_new();
+std::string fileName;
+flow->getAttribute(FlowAttributeKey(FILENAME), fileName);
+archive_entry_set_pathname(entry, fileName.c_str());
+archive_entry_set_size(entry, flow->getSize());
+archive_entry_set_mode(entry, S_IFREG | 0755);
+if (merge_type_ == MERGE_FORMAT_TAR_VALUE) {
+  std::string perm;
+  int permInt;
+  if (flow->getAttribute(BinFiles::TAR_PERMISSIONS_ATTRIBUTE, 
perm)) {
+try {
+  permInt = std::stoi(perm);
+  archive_entry_set_perm(entry, (mode_t) permInt);
+} catch (...) {
--- End diff --

will do.


---


[GitHub] nifi-minifi-cpp pull request #146: Archive merge

2017-10-09 Thread minifirocks
Github user minifirocks commented on a diff in the pull request:

https://github.com/apache/nifi-minifi-cpp/pull/146#discussion_r143550805
  
--- Diff: libminifi/include/processors/MergeContent.h ---
@@ -125,6 +127,127 @@ class BinaryConcatenationMerge : public MergeBin {
 };
 
 
+// Archive Class
+class ArchiveMerge {
+public:
+  // Nest Callback Class for read stream
+  class ReadCallback: public InputStreamCallback {
+  public:
+ReadCallback(uint64_t size, struct archive *arch, struct archive_entry 
*entry) :
+buffer_size_(size), arch_(arch), entry_(entry) {
+}
+~ReadCallback() {
+}
+int64_t process(std::shared_ptr stream) {
+  uint8_t buffer[buffer_size_];
+  int64_t ret = 0;
+  uint64_t read_size;
+  ret = stream->read(buffer, buffer_size_);
+  if (!stream)
--- End diff --

!stream means that we read EOF after the operation, in this case, we did 
not get the full size.


---


  1   2   >