[jira] [Commented] (MINIFICPP-342) MQTT framework

2018-01-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/MINIFICPP-342?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16320084#comment-16320084
 ] 

ASF GitHub Bot commented on MINIFICPP-342:
--

Github user asfgit closed the pull request at:

https://github.com/apache/nifi-minifi-cpp/pull/228


> MQTT framework
> --
>
> Key: MINIFICPP-342
> URL: https://issues.apache.org/jira/browse/MINIFICPP-342
> Project: NiFi MiNiFi C++
>  Issue Type: New Feature
>Affects Versions: 0.3.0
>Reporter: bqiu
>Assignee: bqiu
> Fix For: 0.3.0
>
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (MINIFICPP-342) MQTT framework

2018-01-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/MINIFICPP-342?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16314236#comment-16314236
 ] 

ASF GitHub Bot commented on MINIFICPP-342:
--

Github user minifirocks commented on the issue:

https://github.com/apache/nifi-minifi-cpp/pull/228
  
@phrocker addressed  comments, please review.


> MQTT framework
> --
>
> Key: MINIFICPP-342
> URL: https://issues.apache.org/jira/browse/MINIFICPP-342
> Project: NiFi MiNiFi C++
>  Issue Type: New Feature
>Affects Versions: 0.3.0
>Reporter: bqiu
>Assignee: bqiu
> Fix For: 0.3.0
>
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (MINIFICPP-342) MQTT framework

2018-01-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/MINIFICPP-342?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16312453#comment-16312453
 ] 

ASF GitHub Bot commented on MINIFICPP-342:
--

Github user minifirocks commented on a diff in the pull request:

https://github.com/apache/nifi-minifi-cpp/pull/228#discussion_r159807097
  
--- Diff: thirdparty/paho.mqtt.c/CMakeLists.txt ---
@@ -0,0 +1,86 @@

+#***
+#  Copyright (c) 2015, 2017 logi.cals GmbH and others
+#
+#  All rights reserved. This program and the accompanying materials
+#  are made available under the terms of the Eclipse Public License v1.0
+#  and Eclipse Distribution License v1.0 which accompany this distribution.
+#
+#  The Eclipse Public License is available at
+# http://www.eclipse.org/legal/epl-v10.html
+#  and the Eclipse Distribution License is available at
+#http://www.eclipse.org/org/documents/edl-v10.php.
+#
+#  Contributors:
+# Rainer Poisel - initial version
+# Genis Riera Perez - Add support for building debian package

+#***/
+
+# Note: on OS X you should install XCode and the associated command-line 
tools
+
+CMAKE_MINIMUM_REQUIRED(VERSION 2.8.4)
+PROJECT("paho" C)
+MESSAGE(STATUS "CMake version: " ${CMAKE_VERSION})
+MESSAGE(STATUS "CMake system name: " ${CMAKE_SYSTEM_NAME})
+
+SET(CMAKE_SCRIPTS "${CMAKE_SOURCE_DIR}/cmake")
+SET(CMAKE_MODULE_PATH "${CMAKE_SOURCE_DIR}/cmake/modules")
+
+## build settings
+SET(PAHO_VERSION_MAJOR 1)
+SET(PAHO_VERSION_MINOR 2)
+SET(PAHO_VERSION_PATCH 0)
+SET(CLIENT_VERSION 
${PAHO_VERSION_MAJOR}.${PAHO_VERSION_MINOR}.${PAHO_VERSION_PATCH})
+
+INCLUDE(GNUInstallDirs)
+
+STRING(TIMESTAMP BUILD_TIMESTAMP UTC)
+MESSAGE(STATUS "Timestamp is ${BUILD_TIMESTAMP}")
+
+IF(WIN32)
+  ADD_DEFINITIONS(-D_CRT_SECURE_NO_DEPRECATE -DWIN32_LEAN_AND_MEAN -MD)
+ELSEIF(${CMAKE_SYSTEM_NAME} STREQUAL "Darwin")
+  ADD_DEFINITIONS(-DOSX)
+ENDIF()
+
+## build options
+SET(PAHO_WITH_SSL TRUE CACHE BOOL "Flag that defines whether to build 
ssl-enabled binaries too. ")
+SET(PAHO_BUILD_STATIC FALSE CACHE BOOL "Build static library")
+SET(PAHO_BUILD_DOCUMENTATION FALSE CACHE BOOL "Create and install the HTML 
based API documentation (requires Doxygen)")
+SET(PAHO_BUILD_SAMPLES FALSE CACHE BOOL "Build sample programs")
+SET(PAHO_BUILD_DEB_PACKAGE FALSE CACHE BOOL "Build debian package")
+SET(PAHO_ENABLE_TESTING FALSE CACHE BOOL "Build tests and run")
+
+ADD_SUBDIRECTORY(src)
+IF(PAHO_BUILD_SAMPLES)
+ADD_SUBDIRECTORY(src/samples)
+ENDIF()
+
+IF(PAHO_BUILD_DOCUMENTATION)
+ADD_SUBDIRECTORY(doc)
+ENDIF()
+
+### packaging settings
+IF (WIN32)
+SET(CPACK_GENERATOR "ZIP")
+ELSEIF(PAHO_BUILD_DEB_PACKAGE)
+SET(CPACK_GENERATOR "DEB")
+CONFIGURE_FILE(${CMAKE_SCRIPTS}/CPackDebConfig.cmake.in
+${CMAKE_BINARY_DIR}/CPackDebConfig.cmake @ONLY)
+SET(CPACK_PROJECT_CONFIG_FILE ${CMAKE_BINARY_DIR}/CPackDebConfig.cmake)
+ADD_SUBDIRECTORY(debian)
+ELSE()
+SET(CPACK_GENERATOR "TGZ")
+ENDIF()
+
+SET(CPACK_PACKAGE_VERSION_MAJOR ${PAHO_VERSION_MAJOR})
+SET(CPACK_PACKAGE_VERSION_MINOR ${PAHO_VERSION_MINOR})
+SET(CPACK_PACKAGE_VERSION_PATCH ${PAHO_VERSION_PATCH})
+INCLUDE(CPack)
+
+IF(PAHO_ENABLE_TESTING)
--- End diff --

will remove


> MQTT framework
> --
>
> Key: MINIFICPP-342
> URL: https://issues.apache.org/jira/browse/MINIFICPP-342
> Project: NiFi MiNiFi C++
>  Issue Type: New Feature
>Affects Versions: 0.3.0
>Reporter: bqiu
>Assignee: bqiu
> Fix For: 0.3.0
>
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (MINIFICPP-342) MQTT framework

2018-01-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/MINIFICPP-342?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16312451#comment-16312451
 ] 

ASF GitHub Bot commented on MINIFICPP-342:
--

Github user minifirocks commented on a diff in the pull request:

https://github.com/apache/nifi-minifi-cpp/pull/228#discussion_r159807078
  
--- Diff: extensions/mqtt/PublishMQTT.h ---
@@ -0,0 +1,142 @@
+/**
+ * @file PublishMQTT.h
+ * PublishMQTT class declaration
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef __PUBLISH_MQTT_H__
+#define __PUBLISH_MQTT_H__
+
+#include "FlowFileRecord.h"
+#include "core/Processor.h"
+#include "core/ProcessSession.h"
+#include "core/Core.h"
+#include "core/Resource.h"
+#include "core/Property.h"
+#include "core/logging/LoggerConfiguration.h"
+#include "MQTTClient.h"
+#include "AbstractMQTTProcessor.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+
+// PublishMQTT Class
+class PublishMQTT: public processors::AbstractMQTTProcessor {
+public:
+  // Constructor
+  /*!
+   * Create a new processor
+   */
+  explicit PublishMQTT(std::string name, uuid_t uuid = NULL)
+: processors::AbstractMQTTProcessor(name, uuid), 
logger_(logging::LoggerFactory::getLogger()) {
+retain_ = false;
+max_seg_size_ = ULLONG_MAX;
+  }
+  // Destructor
+  virtual ~PublishMQTT() {
+  }
+  // Processor Name
+  static constexpr char const* ProcessorName = "PublishMQTT";
+  // Supported Properties
+  static core::Property Retain;
+  static core::Property MaxFlowSegSize;
+
+  // Nest Callback Class for read stream
+  class ReadCallback: public InputStreamCallback {
+  public:
+ReadCallback(uint64_t flow_size, uint64_t max_seg_size, const 
std::string , MQTTClient client,
+int qos, bool retain, MQTTClient_deliveryToken ) :
+flow_size_(flow_size), max_seg_size_(max_seg_size), key_(key), 
client_(client),
+qos_(qos), retain_(retain), token_(token) {
+  status_ = 0;
+  read_size_ = 0;
+}
+~ReadCallback() {
+}
+int64_t process(std::shared_ptr stream) {
+  if (flow_size_ < max_seg_size_)
+max_seg_size_ = flow_size_;
+  std::vector buffer;
+  buffer.reserve(max_seg_size_);
+  read_size_ = 0;
+  status_ = 0;
+  while (read_size_ < flow_size_) {
+int readRet = stream->read([0], max_seg_size_);
+if (readRet < 0) {
+  status_ = -1;
+  return read_size_;
+}
+if (readRet > 0) {
+  MQTTClient_message pubmsg = MQTTClient_message_initializer;
+  pubmsg.payload = [0];
+  pubmsg.payloadlen = readRet;
+  pubmsg.qos = qos_;
+  pubmsg.retained = retain_;
+  if (MQTTClient_publishMessage(client_, key_.c_str(), , 
_) != MQTTCLIENT_SUCCESS) {
--- End diff --

it add the MQTT header and call socket write.
the deliverable callback is for QOS.
/**
 * This is a callback function. The client application
 * must provide an implementation of this function to enable asynchronous
 * notification of delivery of messages. The function is registered with the
 * client library by passing it as an argument to MQTTClient_setCallbacks().
 * It is called by the client library after the client application has
 * published a message to the server. It indicates that the necessary
 * handshaking and acknowledgements for the requested quality of service 
(see
 * MQTTClient_message.qos) have been completed. This function is executed 
on a
 * separate thread to the one on which the client application is running.
 * Note:MQTTClient_deliveryComplete() is not called when messages are
 * published at QoS0.
 * @param context A pointer to the context value originally 

[jira] [Commented] (MINIFICPP-342) MQTT framework

2018-01-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/MINIFICPP-342?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16312452#comment-16312452
 ] 

ASF GitHub Bot commented on MINIFICPP-342:
--

Github user minifirocks commented on a diff in the pull request:

https://github.com/apache/nifi-minifi-cpp/pull/228#discussion_r159807086
  
--- Diff: thirdparty/paho.mqtt.c/.travis.yml ---
@@ -0,0 +1,47 @@
+sudo: true
--- End diff --

will remove


> MQTT framework
> --
>
> Key: MINIFICPP-342
> URL: https://issues.apache.org/jira/browse/MINIFICPP-342
> Project: NiFi MiNiFi C++
>  Issue Type: New Feature
>Affects Versions: 0.3.0
>Reporter: bqiu
>Assignee: bqiu
> Fix For: 0.3.0
>
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (MINIFICPP-342) MQTT framework

2018-01-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/MINIFICPP-342?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16312444#comment-16312444
 ] 

ASF GitHub Bot commented on MINIFICPP-342:
--

Github user minifirocks commented on a diff in the pull request:

https://github.com/apache/nifi-minifi-cpp/pull/228#discussion_r159806405
  
--- Diff: extensions/mqtt/ConsumeMQTT.h ---
@@ -0,0 +1,125 @@
+/**
+ * @file ConsumeMQTT.h
+ * ConsumeMQTT class declaration
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef __CONSUME_MQTT_H__
+#define __CONSUME_MQTT_H__
+
+#include 
+#include 
+#include "FlowFileRecord.h"
+#include "core/Processor.h"
+#include "core/ProcessSession.h"
+#include "core/Core.h"
+#include "core/Resource.h"
+#include "core/Property.h"
+#include "core/logging/LoggerConfiguration.h"
+#include "MQTTClient.h"
+#include "AbstractMQTTProcessor.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+
+#define MQTT_TOPIC_ATTRIBUTE "mqtt.topic"
+#define MQTT_BROKER_ATTRIBUTE "mqtt.broker"
+
+// ConsumeMQTT Class
+class ConsumeMQTT: public processors::AbstractMQTTProcessor {
+public:
+  // Constructor
+  /*!
+   * Create a new processor
+   */
+  explicit ConsumeMQTT(std::string name, uuid_t uuid = NULL)
+: processors::AbstractMQTTProcessor(name, uuid), 
logger_(logging::LoggerFactory::getLogger()) {
+isSubscriber_ = true;
+maxQueueSize_ = 100;
+  }
+  // Destructor
+  virtual ~ConsumeMQTT() {
+std::lock_guard < std::mutex > lock(mutex_);
+while (!queue_.empty()) {
+  MQTTClient_message *message = queue_.front();
+  MQTTClient_freeMessage();
+  queue_.pop_front();
+}
+  }
+  // Processor Name
+  static constexpr char const* ProcessorName = "ConsumeMQTT";
+  // Supported Properties
+  static core::Property MaxQueueSize;
+  // Nest Callback Class for write stream
+  class WriteCallback: public OutputStreamCallback {
+  public:
+WriteCallback(MQTTClient_message *message) :
+  message_(message) {
+  status_ = 0;
+}
+MQTTClient_message *message_;
+int64_t process(std::shared_ptr stream) {
+  int64_t len = 
stream->write(reinterpret_cast(message_->payload), 
message_->payloadlen);
+  if (len < 0)
+status_ = -1;
+  return len;
+}
+int status_;
+  };
+
+public:
+  /**
+   * Function that's executed when the processor is scheduled.
+   * @param context process context.
+   * @param sessionFactory process session factory that is used when 
creating
+   * ProcessSession objects.
+   */
+  void onSchedule(core::ProcessContext *context, 
core::ProcessSessionFactory *sessionFactory);
+  // OnTrigger method, implemented by NiFi ConsumeMQTT
+  virtual void onTrigger(const std::shared_ptr 
, const std::shared_ptr );
+  // Initialize, over write by NiFi ConsumeMQTT
+  virtual void initialize(void);
+  virtual bool enqueueReceiveMQTTMsg(MQTTClient_message *message);
+
+protected:
+  void getReceivedMQTTMsg(std::deque _queue) {
+std::lock_guard < std::mutex > lock(mutex_);
--- End diff --

will do


> MQTT framework
> --
>
> Key: MINIFICPP-342
> URL: https://issues.apache.org/jira/browse/MINIFICPP-342
> Project: NiFi MiNiFi C++
>  Issue Type: New Feature
>Affects Versions: 0.3.0
>Reporter: bqiu
>Assignee: bqiu
> Fix For: 0.3.0
>
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (MINIFICPP-342) MQTT framework

2018-01-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/MINIFICPP-342?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16312443#comment-16312443
 ] 

ASF GitHub Bot commented on MINIFICPP-342:
--

Github user minifirocks commented on a diff in the pull request:

https://github.com/apache/nifi-minifi-cpp/pull/228#discussion_r159806371
  
--- Diff: extensions/mqtt/AbstractMQTTProcessor.h ---
@@ -0,0 +1,154 @@
+/**
+ * @file AbstractMQTTProcessor.h
+ * AbstractMQTTProcessor class declaration
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef __ABSTRACTMQTT_H__
+#define __ABSTRACTMQTT_H__
+
+#include "FlowFileRecord.h"
+#include "core/Processor.h"
+#include "core/ProcessSession.h"
+#include "core/Core.h"
+#include "core/Resource.h"
+#include "core/logging/LoggerConfiguration.h"
+#include "MQTTClient.h"
+
+#define MQTT_QOS_0 "0"
+#define MQTT_QOS_1 "1"
+#define MQTT_QOS_2 "2"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+
+// AbstractMQTTProcessor Class
+class AbstractMQTTProcessor : public core::Processor {
+ public:
+  // Constructor
+  /*!
+   * Create a new processor
+   */
+  explicit AbstractMQTTProcessor(std::string name, uuid_t uuid = NULL)
+  : core::Processor(name, uuid),
+
logger_(logging::LoggerFactory::getLogger()) {
+client_ = nullptr;
+cleanSession_ = false;
+keepAliveInterval_ = 60;
+connectionTimeOut_ = 30;
+qos_ = 0;
+isSubscriber_ = false;
+  }
+  // Destructor
+  virtual ~AbstractMQTTProcessor() {
+if (isSubscriber_) {
+  MQTTClient_unsubscribe(client_, topic_.c_str());
--- End diff --

it should be. only side effect is if app halt and did not unsub. The broker 
will buffer the msgs to the topic while the client is go away. if the app 
restart with the same topic and same client ID, it will rx all msg buffer for 
that client buffered in the broker.



> MQTT framework
> --
>
> Key: MINIFICPP-342
> URL: https://issues.apache.org/jira/browse/MINIFICPP-342
> Project: NiFi MiNiFi C++
>  Issue Type: New Feature
>Affects Versions: 0.3.0
>Reporter: bqiu
>Assignee: bqiu
> Fix For: 0.3.0
>
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (MINIFICPP-342) MQTT framework

2018-01-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/MINIFICPP-342?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16312438#comment-16312438
 ] 

ASF GitHub Bot commented on MINIFICPP-342:
--

Github user minifirocks commented on a diff in the pull request:

https://github.com/apache/nifi-minifi-cpp/pull/228#discussion_r159805956
  
--- Diff: extensions/mqtt/AbstractMQTTProcessor.cpp ---
@@ -0,0 +1,158 @@
+/**
+ * @file AbstractMQTTProcessor.cpp
+ * AbstractMQTTProcessor class implementation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "AbstractMQTTProcessor.h"
+#include 
+#include 
+#include 
+#include "utils/TimeUtil.h"
+#include "utils/StringUtils.h"
+#include "core/ProcessContext.h"
+#include "core/ProcessSession.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+
+core::Property AbstractMQTTProcessor::BrokerURL("Broker URI", "The URI to 
use to connect to the MQTT broker", "");
+core::Property AbstractMQTTProcessor::CleanSession("Session state", 
"Whether to start afresh or resume previous flows. See the allowable value 
descriptions for more details", "true");
+core::Property AbstractMQTTProcessor::ClientID("Client ID", "MQTT client 
ID to use", "");
+core::Property AbstractMQTTProcessor::UserName("Username", "Username to 
use when connecting to the broker", "");
+core::Property AbstractMQTTProcessor::PassWord("Password", "Password to 
use when connecting to the broker", "");
+core::Property AbstractMQTTProcessor::KeepLiveInterval("Keep Alive 
Interval", "Defines the maximum time interval between messages sent or 
received", "60 sec");
+core::Property AbstractMQTTProcessor::ConnectionTimeOut("Connection 
Timeout", "Maximum time interval the client will wait for the network 
connection to the MQTT server", "30 sec");
+core::Property AbstractMQTTProcessor::QOS("Quality of Service", "The 
Quality of Service(QoS) to send the message with. Accepts three values '0', '1' 
and '2'", "MQTT_QOS_0");
+core::Property AbstractMQTTProcessor::Topic("Topic", "The topic to publish 
the message to", "");
+core::Relationship AbstractMQTTProcessor::Success("success", "FlowFiles 
that are sent successfully to the destination are transferred to this 
relationship");
+core::Relationship AbstractMQTTProcessor::Failure("failure", "FlowFiles 
that failed to send to the destination are transferred to this relationship");
+
+void AbstractMQTTProcessor::initialize() {
+  // Set the supported properties
+  std::set properties;
+  properties.insert(BrokerURL);
+  properties.insert(CleanSession);
+  properties.insert(ClientID);
+  properties.insert(UserName);
+  properties.insert(PassWord);
+  properties.insert(KeepLiveInterval);
+  properties.insert(ConnectionTimeOut);
+  properties.insert(QOS);
+  properties.insert(Topic);
+  setSupportedProperties(properties);
+  // Set the supported relationships
+  std::set relationships;
+  relationships.insert(Success);
+  relationships.insert(Failure);
+  setSupportedRelationships(relationships);
+}
+
+void AbstractMQTTProcessor::onSchedule(core::ProcessContext *context, 
core::ProcessSessionFactory *sessionFactory) {
+  std::string value;
+  int64_t valInt;
+  value = "";
+  if (context->getProperty(BrokerURL.getName(), value) && !value.empty()) {
+uri_ = value;
+logger_->log_info("AbstractMQTTProcessor: BrokerURL [%s]", uri_);
+  }
+  value = "";
+  if (context->getProperty(ClientID.getName(), value) && !value.empty()) {
+clientID_ = value;
+logger_->log_info("AbstractMQTTProcessor: ClientID [%s]", clientID_);
+  }
+  value = "";
+  if (context->getProperty(Topic.getName(), value) && !value.empty()) {
+topic_ = value;
+logger_->log_info("AbstractMQTTProcessor: Topic [%s]", topic_);
+  }
+  value = "";
+  if 

[jira] [Commented] (MINIFICPP-342) MQTT framework

2018-01-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/MINIFICPP-342?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16312427#comment-16312427
 ] 

ASF GitHub Bot commented on MINIFICPP-342:
--

Github user minifirocks commented on a diff in the pull request:

https://github.com/apache/nifi-minifi-cpp/pull/228#discussion_r159804421
  
--- Diff: extensions/mqtt/AbstractMQTTProcessor.cpp ---
@@ -0,0 +1,158 @@
+/**
+ * @file AbstractMQTTProcessor.cpp
+ * AbstractMQTTProcessor class implementation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "AbstractMQTTProcessor.h"
+#include 
+#include 
+#include 
+#include "utils/TimeUtil.h"
+#include "utils/StringUtils.h"
+#include "core/ProcessContext.h"
+#include "core/ProcessSession.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+
+core::Property AbstractMQTTProcessor::BrokerURL("Broker URI", "The URI to 
use to connect to the MQTT broker", "");
+core::Property AbstractMQTTProcessor::CleanSession("Session state", 
"Whether to start afresh or resume previous flows. See the allowable value 
descriptions for more details", "true");
+core::Property AbstractMQTTProcessor::ClientID("Client ID", "MQTT client 
ID to use", "");
+core::Property AbstractMQTTProcessor::UserName("Username", "Username to 
use when connecting to the broker", "");
+core::Property AbstractMQTTProcessor::PassWord("Password", "Password to 
use when connecting to the broker", "");
+core::Property AbstractMQTTProcessor::KeepLiveInterval("Keep Alive 
Interval", "Defines the maximum time interval between messages sent or 
received", "60 sec");
+core::Property AbstractMQTTProcessor::ConnectionTimeOut("Connection 
Timeout", "Maximum time interval the client will wait for the network 
connection to the MQTT server", "30 sec");
+core::Property AbstractMQTTProcessor::QOS("Quality of Service", "The 
Quality of Service(QoS) to send the message with. Accepts three values '0', '1' 
and '2'", "MQTT_QOS_0");
+core::Property AbstractMQTTProcessor::Topic("Topic", "The topic to publish 
the message to", "");
+core::Relationship AbstractMQTTProcessor::Success("success", "FlowFiles 
that are sent successfully to the destination are transferred to this 
relationship");
+core::Relationship AbstractMQTTProcessor::Failure("failure", "FlowFiles 
that failed to send to the destination are transferred to this relationship");
+
+void AbstractMQTTProcessor::initialize() {
+  // Set the supported properties
+  std::set properties;
+  properties.insert(BrokerURL);
+  properties.insert(CleanSession);
+  properties.insert(ClientID);
+  properties.insert(UserName);
+  properties.insert(PassWord);
+  properties.insert(KeepLiveInterval);
+  properties.insert(ConnectionTimeOut);
+  properties.insert(QOS);
+  properties.insert(Topic);
+  setSupportedProperties(properties);
+  // Set the supported relationships
+  std::set relationships;
+  relationships.insert(Success);
+  relationships.insert(Failure);
+  setSupportedRelationships(relationships);
+}
+
+void AbstractMQTTProcessor::onSchedule(core::ProcessContext *context, 
core::ProcessSessionFactory *sessionFactory) {
+  std::string value;
+  int64_t valInt;
+  value = "";
+  if (context->getProperty(BrokerURL.getName(), value) && !value.empty()) {
+uri_ = value;
+logger_->log_info("AbstractMQTTProcessor: BrokerURL [%s]", uri_);
+  }
+  value = "";
+  if (context->getProperty(ClientID.getName(), value) && !value.empty()) {
+clientID_ = value;
+logger_->log_info("AbstractMQTTProcessor: ClientID [%s]", clientID_);
+  }
+  value = "";
+  if (context->getProperty(Topic.getName(), value) && !value.empty()) {
+topic_ = value;
+logger_->log_info("AbstractMQTTProcessor: Topic [%s]", topic_);
+  }
+  value = "";
+  if 

[jira] [Commented] (MINIFICPP-342) MQTT framework

2018-01-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/MINIFICPP-342?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16312426#comment-16312426
 ] 

ASF GitHub Bot commented on MINIFICPP-342:
--

Github user minifirocks commented on a diff in the pull request:

https://github.com/apache/nifi-minifi-cpp/pull/228#discussion_r159804409
  
--- Diff: extensions/mqtt/AbstractMQTTProcessor.cpp ---
@@ -0,0 +1,158 @@
+/**
+ * @file AbstractMQTTProcessor.cpp
+ * AbstractMQTTProcessor class implementation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "AbstractMQTTProcessor.h"
+#include 
+#include 
+#include 
+#include "utils/TimeUtil.h"
+#include "utils/StringUtils.h"
+#include "core/ProcessContext.h"
+#include "core/ProcessSession.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+
+core::Property AbstractMQTTProcessor::BrokerURL("Broker URI", "The URI to 
use to connect to the MQTT broker", "");
+core::Property AbstractMQTTProcessor::CleanSession("Session state", 
"Whether to start afresh or resume previous flows. See the allowable value 
descriptions for more details", "true");
+core::Property AbstractMQTTProcessor::ClientID("Client ID", "MQTT client 
ID to use", "");
+core::Property AbstractMQTTProcessor::UserName("Username", "Username to 
use when connecting to the broker", "");
+core::Property AbstractMQTTProcessor::PassWord("Password", "Password to 
use when connecting to the broker", "");
+core::Property AbstractMQTTProcessor::KeepLiveInterval("Keep Alive 
Interval", "Defines the maximum time interval between messages sent or 
received", "60 sec");
+core::Property AbstractMQTTProcessor::ConnectionTimeOut("Connection 
Timeout", "Maximum time interval the client will wait for the network 
connection to the MQTT server", "30 sec");
+core::Property AbstractMQTTProcessor::QOS("Quality of Service", "The 
Quality of Service(QoS) to send the message with. Accepts three values '0', '1' 
and '2'", "MQTT_QOS_0");
+core::Property AbstractMQTTProcessor::Topic("Topic", "The topic to publish 
the message to", "");
+core::Relationship AbstractMQTTProcessor::Success("success", "FlowFiles 
that are sent successfully to the destination are transferred to this 
relationship");
+core::Relationship AbstractMQTTProcessor::Failure("failure", "FlowFiles 
that failed to send to the destination are transferred to this relationship");
+
+void AbstractMQTTProcessor::initialize() {
+  // Set the supported properties
+  std::set properties;
+  properties.insert(BrokerURL);
+  properties.insert(CleanSession);
+  properties.insert(ClientID);
+  properties.insert(UserName);
+  properties.insert(PassWord);
+  properties.insert(KeepLiveInterval);
+  properties.insert(ConnectionTimeOut);
+  properties.insert(QOS);
+  properties.insert(Topic);
+  setSupportedProperties(properties);
+  // Set the supported relationships
+  std::set relationships;
+  relationships.insert(Success);
+  relationships.insert(Failure);
+  setSupportedRelationships(relationships);
+}
+
+void AbstractMQTTProcessor::onSchedule(core::ProcessContext *context, 
core::ProcessSessionFactory *sessionFactory) {
+  std::string value;
+  int64_t valInt;
+  value = "";
+  if (context->getProperty(BrokerURL.getName(), value) && !value.empty()) {
+uri_ = value;
+logger_->log_info("AbstractMQTTProcessor: BrokerURL [%s]", uri_);
+  }
+  value = "";
+  if (context->getProperty(ClientID.getName(), value) && !value.empty()) {
+clientID_ = value;
+logger_->log_info("AbstractMQTTProcessor: ClientID [%s]", clientID_);
+  }
+  value = "";
+  if (context->getProperty(Topic.getName(), value) && !value.empty()) {
+topic_ = value;
+logger_->log_info("AbstractMQTTProcessor: Topic [%s]", topic_);
+  }
+  value = "";
+  if 

[jira] [Commented] (MINIFICPP-342) MQTT framework

2018-01-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/MINIFICPP-342?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16312326#comment-16312326
 ] 

ASF GitHub Bot commented on MINIFICPP-342:
--

Github user phrocker commented on a diff in the pull request:

https://github.com/apache/nifi-minifi-cpp/pull/228#discussion_r159794850
  
--- Diff: extensions/mqtt/AbstractMQTTProcessor.cpp ---
@@ -0,0 +1,158 @@
+/**
+ * @file AbstractMQTTProcessor.cpp
+ * AbstractMQTTProcessor class implementation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "AbstractMQTTProcessor.h"
+#include 
+#include 
+#include 
+#include "utils/TimeUtil.h"
+#include "utils/StringUtils.h"
+#include "core/ProcessContext.h"
+#include "core/ProcessSession.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+
+core::Property AbstractMQTTProcessor::BrokerURL("Broker URI", "The URI to 
use to connect to the MQTT broker", "");
+core::Property AbstractMQTTProcessor::CleanSession("Session state", 
"Whether to start afresh or resume previous flows. See the allowable value 
descriptions for more details", "true");
+core::Property AbstractMQTTProcessor::ClientID("Client ID", "MQTT client 
ID to use", "");
+core::Property AbstractMQTTProcessor::UserName("Username", "Username to 
use when connecting to the broker", "");
+core::Property AbstractMQTTProcessor::PassWord("Password", "Password to 
use when connecting to the broker", "");
+core::Property AbstractMQTTProcessor::KeepLiveInterval("Keep Alive 
Interval", "Defines the maximum time interval between messages sent or 
received", "60 sec");
+core::Property AbstractMQTTProcessor::ConnectionTimeOut("Connection 
Timeout", "Maximum time interval the client will wait for the network 
connection to the MQTT server", "30 sec");
+core::Property AbstractMQTTProcessor::QOS("Quality of Service", "The 
Quality of Service(QoS) to send the message with. Accepts three values '0', '1' 
and '2'", "MQTT_QOS_0");
+core::Property AbstractMQTTProcessor::Topic("Topic", "The topic to publish 
the message to", "");
+core::Relationship AbstractMQTTProcessor::Success("success", "FlowFiles 
that are sent successfully to the destination are transferred to this 
relationship");
+core::Relationship AbstractMQTTProcessor::Failure("failure", "FlowFiles 
that failed to send to the destination are transferred to this relationship");
+
+void AbstractMQTTProcessor::initialize() {
+  // Set the supported properties
+  std::set properties;
+  properties.insert(BrokerURL);
+  properties.insert(CleanSession);
+  properties.insert(ClientID);
+  properties.insert(UserName);
+  properties.insert(PassWord);
+  properties.insert(KeepLiveInterval);
+  properties.insert(ConnectionTimeOut);
+  properties.insert(QOS);
+  properties.insert(Topic);
+  setSupportedProperties(properties);
+  // Set the supported relationships
+  std::set relationships;
+  relationships.insert(Success);
+  relationships.insert(Failure);
+  setSupportedRelationships(relationships);
+}
+
+void AbstractMQTTProcessor::onSchedule(core::ProcessContext *context, 
core::ProcessSessionFactory *sessionFactory) {
+  std::string value;
+  int64_t valInt;
+  value = "";
+  if (context->getProperty(BrokerURL.getName(), value) && !value.empty()) {
+uri_ = value;
+logger_->log_info("AbstractMQTTProcessor: BrokerURL [%s]", uri_);
+  }
+  value = "";
+  if (context->getProperty(ClientID.getName(), value) && !value.empty()) {
+clientID_ = value;
+logger_->log_info("AbstractMQTTProcessor: ClientID [%s]", clientID_);
+  }
+  value = "";
+  if (context->getProperty(Topic.getName(), value) && !value.empty()) {
+topic_ = value;
+logger_->log_info("AbstractMQTTProcessor: Topic [%s]", topic_);
+  }
+  value = "";
+  if 

[jira] [Commented] (MINIFICPP-342) MQTT framework

2018-01-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/MINIFICPP-342?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16312320#comment-16312320
 ] 

ASF GitHub Bot commented on MINIFICPP-342:
--

Github user phrocker commented on a diff in the pull request:

https://github.com/apache/nifi-minifi-cpp/pull/228#discussion_r159793508
  
--- Diff: extensions/mqtt/AbstractMQTTProcessor.h ---
@@ -0,0 +1,154 @@
+/**
+ * @file AbstractMQTTProcessor.h
+ * AbstractMQTTProcessor class declaration
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef __ABSTRACTMQTT_H__
+#define __ABSTRACTMQTT_H__
+
+#include "FlowFileRecord.h"
+#include "core/Processor.h"
+#include "core/ProcessSession.h"
+#include "core/Core.h"
+#include "core/Resource.h"
+#include "core/logging/LoggerConfiguration.h"
+#include "MQTTClient.h"
+
+#define MQTT_QOS_0 "0"
+#define MQTT_QOS_1 "1"
+#define MQTT_QOS_2 "2"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+
+// AbstractMQTTProcessor Class
+class AbstractMQTTProcessor : public core::Processor {
+ public:
+  // Constructor
+  /*!
+   * Create a new processor
+   */
+  explicit AbstractMQTTProcessor(std::string name, uuid_t uuid = NULL)
+  : core::Processor(name, uuid),
+
logger_(logging::LoggerFactory::getLogger()) {
+client_ = nullptr;
+cleanSession_ = false;
+keepAliveInterval_ = 60;
+connectionTimeOut_ = 30;
+qos_ = 0;
+isSubscriber_ = false;
+  }
+  // Destructor
+  virtual ~AbstractMQTTProcessor() {
+if (isSubscriber_) {
+  MQTTClient_unsubscribe(client_, topic_.c_str());
--- End diff --

what happens if unsubscribe is not called due to failure? Is that 
eventually okay?


> MQTT framework
> --
>
> Key: MINIFICPP-342
> URL: https://issues.apache.org/jira/browse/MINIFICPP-342
> Project: NiFi MiNiFi C++
>  Issue Type: New Feature
>Affects Versions: 0.3.0
>Reporter: bqiu
>Assignee: bqiu
> Fix For: 0.3.0
>
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (MINIFICPP-342) MQTT framework

2018-01-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/MINIFICPP-342?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16312318#comment-16312318
 ] 

ASF GitHub Bot commented on MINIFICPP-342:
--

Github user phrocker commented on a diff in the pull request:

https://github.com/apache/nifi-minifi-cpp/pull/228#discussion_r159792117
  
--- Diff: extensions/mqtt/AbstractMQTTProcessor.cpp ---
@@ -0,0 +1,158 @@
+/**
+ * @file AbstractMQTTProcessor.cpp
+ * AbstractMQTTProcessor class implementation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "AbstractMQTTProcessor.h"
+#include 
+#include 
+#include 
+#include "utils/TimeUtil.h"
+#include "utils/StringUtils.h"
+#include "core/ProcessContext.h"
+#include "core/ProcessSession.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+
+core::Property AbstractMQTTProcessor::BrokerURL("Broker URI", "The URI to 
use to connect to the MQTT broker", "");
+core::Property AbstractMQTTProcessor::CleanSession("Session state", 
"Whether to start afresh or resume previous flows. See the allowable value 
descriptions for more details", "true");
+core::Property AbstractMQTTProcessor::ClientID("Client ID", "MQTT client 
ID to use", "");
+core::Property AbstractMQTTProcessor::UserName("Username", "Username to 
use when connecting to the broker", "");
+core::Property AbstractMQTTProcessor::PassWord("Password", "Password to 
use when connecting to the broker", "");
+core::Property AbstractMQTTProcessor::KeepLiveInterval("Keep Alive 
Interval", "Defines the maximum time interval between messages sent or 
received", "60 sec");
+core::Property AbstractMQTTProcessor::ConnectionTimeOut("Connection 
Timeout", "Maximum time interval the client will wait for the network 
connection to the MQTT server", "30 sec");
+core::Property AbstractMQTTProcessor::QOS("Quality of Service", "The 
Quality of Service(QoS) to send the message with. Accepts three values '0', '1' 
and '2'", "MQTT_QOS_0");
+core::Property AbstractMQTTProcessor::Topic("Topic", "The topic to publish 
the message to", "");
+core::Relationship AbstractMQTTProcessor::Success("success", "FlowFiles 
that are sent successfully to the destination are transferred to this 
relationship");
+core::Relationship AbstractMQTTProcessor::Failure("failure", "FlowFiles 
that failed to send to the destination are transferred to this relationship");
+
+void AbstractMQTTProcessor::initialize() {
+  // Set the supported properties
+  std::set properties;
+  properties.insert(BrokerURL);
+  properties.insert(CleanSession);
+  properties.insert(ClientID);
+  properties.insert(UserName);
+  properties.insert(PassWord);
+  properties.insert(KeepLiveInterval);
+  properties.insert(ConnectionTimeOut);
+  properties.insert(QOS);
+  properties.insert(Topic);
+  setSupportedProperties(properties);
+  // Set the supported relationships
+  std::set relationships;
+  relationships.insert(Success);
+  relationships.insert(Failure);
+  setSupportedRelationships(relationships);
+}
+
+void AbstractMQTTProcessor::onSchedule(core::ProcessContext *context, 
core::ProcessSessionFactory *sessionFactory) {
+  std::string value;
+  int64_t valInt;
+  value = "";
+  if (context->getProperty(BrokerURL.getName(), value) && !value.empty()) {
+uri_ = value;
+logger_->log_info("AbstractMQTTProcessor: BrokerURL [%s]", uri_);
+  }
+  value = "";
+  if (context->getProperty(ClientID.getName(), value) && !value.empty()) {
+clientID_ = value;
+logger_->log_info("AbstractMQTTProcessor: ClientID [%s]", clientID_);
+  }
+  value = "";
+  if (context->getProperty(Topic.getName(), value) && !value.empty()) {
+topic_ = value;
+logger_->log_info("AbstractMQTTProcessor: Topic [%s]", topic_);
+  }
+  value = "";
+  if 

[jira] [Commented] (MINIFICPP-342) MQTT framework

2018-01-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/MINIFICPP-342?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16312322#comment-16312322
 ] 

ASF GitHub Bot commented on MINIFICPP-342:
--

Github user phrocker commented on a diff in the pull request:

https://github.com/apache/nifi-minifi-cpp/pull/228#discussion_r159792503
  
--- Diff: extensions/mqtt/AbstractMQTTProcessor.cpp ---
@@ -0,0 +1,158 @@
+/**
+ * @file AbstractMQTTProcessor.cpp
+ * AbstractMQTTProcessor class implementation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "AbstractMQTTProcessor.h"
+#include 
+#include 
+#include 
+#include "utils/TimeUtil.h"
+#include "utils/StringUtils.h"
+#include "core/ProcessContext.h"
+#include "core/ProcessSession.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+
+core::Property AbstractMQTTProcessor::BrokerURL("Broker URI", "The URI to 
use to connect to the MQTT broker", "");
+core::Property AbstractMQTTProcessor::CleanSession("Session state", 
"Whether to start afresh or resume previous flows. See the allowable value 
descriptions for more details", "true");
+core::Property AbstractMQTTProcessor::ClientID("Client ID", "MQTT client 
ID to use", "");
+core::Property AbstractMQTTProcessor::UserName("Username", "Username to 
use when connecting to the broker", "");
+core::Property AbstractMQTTProcessor::PassWord("Password", "Password to 
use when connecting to the broker", "");
+core::Property AbstractMQTTProcessor::KeepLiveInterval("Keep Alive 
Interval", "Defines the maximum time interval between messages sent or 
received", "60 sec");
+core::Property AbstractMQTTProcessor::ConnectionTimeOut("Connection 
Timeout", "Maximum time interval the client will wait for the network 
connection to the MQTT server", "30 sec");
+core::Property AbstractMQTTProcessor::QOS("Quality of Service", "The 
Quality of Service(QoS) to send the message with. Accepts three values '0', '1' 
and '2'", "MQTT_QOS_0");
+core::Property AbstractMQTTProcessor::Topic("Topic", "The topic to publish 
the message to", "");
+core::Relationship AbstractMQTTProcessor::Success("success", "FlowFiles 
that are sent successfully to the destination are transferred to this 
relationship");
+core::Relationship AbstractMQTTProcessor::Failure("failure", "FlowFiles 
that failed to send to the destination are transferred to this relationship");
+
+void AbstractMQTTProcessor::initialize() {
+  // Set the supported properties
+  std::set properties;
+  properties.insert(BrokerURL);
+  properties.insert(CleanSession);
+  properties.insert(ClientID);
+  properties.insert(UserName);
+  properties.insert(PassWord);
+  properties.insert(KeepLiveInterval);
+  properties.insert(ConnectionTimeOut);
+  properties.insert(QOS);
+  properties.insert(Topic);
+  setSupportedProperties(properties);
+  // Set the supported relationships
+  std::set relationships;
+  relationships.insert(Success);
+  relationships.insert(Failure);
+  setSupportedRelationships(relationships);
+}
+
+void AbstractMQTTProcessor::onSchedule(core::ProcessContext *context, 
core::ProcessSessionFactory *sessionFactory) {
+  std::string value;
+  int64_t valInt;
+  value = "";
+  if (context->getProperty(BrokerURL.getName(), value) && !value.empty()) {
+uri_ = value;
+logger_->log_info("AbstractMQTTProcessor: BrokerURL [%s]", uri_);
+  }
+  value = "";
+  if (context->getProperty(ClientID.getName(), value) && !value.empty()) {
+clientID_ = value;
+logger_->log_info("AbstractMQTTProcessor: ClientID [%s]", clientID_);
+  }
+  value = "";
+  if (context->getProperty(Topic.getName(), value) && !value.empty()) {
+topic_ = value;
+logger_->log_info("AbstractMQTTProcessor: Topic [%s]", topic_);
+  }
+  value = "";
+  if 

[jira] [Commented] (MINIFICPP-342) MQTT framework

2018-01-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/MINIFICPP-342?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16312319#comment-16312319
 ] 

ASF GitHub Bot commented on MINIFICPP-342:
--

Github user phrocker commented on a diff in the pull request:

https://github.com/apache/nifi-minifi-cpp/pull/228#discussion_r159792523
  
--- Diff: extensions/mqtt/AbstractMQTTProcessor.cpp ---
@@ -0,0 +1,158 @@
+/**
+ * @file AbstractMQTTProcessor.cpp
+ * AbstractMQTTProcessor class implementation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "AbstractMQTTProcessor.h"
+#include 
+#include 
+#include 
+#include "utils/TimeUtil.h"
+#include "utils/StringUtils.h"
+#include "core/ProcessContext.h"
+#include "core/ProcessSession.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+
+core::Property AbstractMQTTProcessor::BrokerURL("Broker URI", "The URI to 
use to connect to the MQTT broker", "");
+core::Property AbstractMQTTProcessor::CleanSession("Session state", 
"Whether to start afresh or resume previous flows. See the allowable value 
descriptions for more details", "true");
+core::Property AbstractMQTTProcessor::ClientID("Client ID", "MQTT client 
ID to use", "");
+core::Property AbstractMQTTProcessor::UserName("Username", "Username to 
use when connecting to the broker", "");
+core::Property AbstractMQTTProcessor::PassWord("Password", "Password to 
use when connecting to the broker", "");
+core::Property AbstractMQTTProcessor::KeepLiveInterval("Keep Alive 
Interval", "Defines the maximum time interval between messages sent or 
received", "60 sec");
+core::Property AbstractMQTTProcessor::ConnectionTimeOut("Connection 
Timeout", "Maximum time interval the client will wait for the network 
connection to the MQTT server", "30 sec");
+core::Property AbstractMQTTProcessor::QOS("Quality of Service", "The 
Quality of Service(QoS) to send the message with. Accepts three values '0', '1' 
and '2'", "MQTT_QOS_0");
+core::Property AbstractMQTTProcessor::Topic("Topic", "The topic to publish 
the message to", "");
+core::Relationship AbstractMQTTProcessor::Success("success", "FlowFiles 
that are sent successfully to the destination are transferred to this 
relationship");
+core::Relationship AbstractMQTTProcessor::Failure("failure", "FlowFiles 
that failed to send to the destination are transferred to this relationship");
+
+void AbstractMQTTProcessor::initialize() {
+  // Set the supported properties
+  std::set properties;
+  properties.insert(BrokerURL);
+  properties.insert(CleanSession);
+  properties.insert(ClientID);
+  properties.insert(UserName);
+  properties.insert(PassWord);
+  properties.insert(KeepLiveInterval);
+  properties.insert(ConnectionTimeOut);
+  properties.insert(QOS);
+  properties.insert(Topic);
+  setSupportedProperties(properties);
+  // Set the supported relationships
+  std::set relationships;
+  relationships.insert(Success);
+  relationships.insert(Failure);
+  setSupportedRelationships(relationships);
+}
+
+void AbstractMQTTProcessor::onSchedule(core::ProcessContext *context, 
core::ProcessSessionFactory *sessionFactory) {
+  std::string value;
+  int64_t valInt;
+  value = "";
+  if (context->getProperty(BrokerURL.getName(), value) && !value.empty()) {
+uri_ = value;
+logger_->log_info("AbstractMQTTProcessor: BrokerURL [%s]", uri_);
+  }
+  value = "";
+  if (context->getProperty(ClientID.getName(), value) && !value.empty()) {
+clientID_ = value;
+logger_->log_info("AbstractMQTTProcessor: ClientID [%s]", clientID_);
+  }
+  value = "";
+  if (context->getProperty(Topic.getName(), value) && !value.empty()) {
+topic_ = value;
+logger_->log_info("AbstractMQTTProcessor: Topic [%s]", topic_);
+  }
+  value = "";
+  if 

[jira] [Commented] (MINIFICPP-342) MQTT framework

2018-01-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/MINIFICPP-342?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16312324#comment-16312324
 ] 

ASF GitHub Bot commented on MINIFICPP-342:
--

Github user phrocker commented on a diff in the pull request:

https://github.com/apache/nifi-minifi-cpp/pull/228#discussion_r159794541
  
--- Diff: thirdparty/paho.mqtt.c/.travis.yml ---
@@ -0,0 +1,47 @@
+sudo: true
--- End diff --

this file doesn't seem like it's needed. 


> MQTT framework
> --
>
> Key: MINIFICPP-342
> URL: https://issues.apache.org/jira/browse/MINIFICPP-342
> Project: NiFi MiNiFi C++
>  Issue Type: New Feature
>Affects Versions: 0.3.0
>Reporter: bqiu
>Assignee: bqiu
> Fix For: 0.3.0
>
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (MINIFICPP-342) MQTT framework

2018-01-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/MINIFICPP-342?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16312321#comment-16312321
 ] 

ASF GitHub Bot commented on MINIFICPP-342:
--

Github user phrocker commented on a diff in the pull request:

https://github.com/apache/nifi-minifi-cpp/pull/228#discussion_r159794693
  
--- Diff: thirdparty/paho.mqtt.c/CMakeLists.txt ---
@@ -0,0 +1,86 @@

+#***
+#  Copyright (c) 2015, 2017 logi.cals GmbH and others
+#
+#  All rights reserved. This program and the accompanying materials
+#  are made available under the terms of the Eclipse Public License v1.0
+#  and Eclipse Distribution License v1.0 which accompany this distribution.
+#
+#  The Eclipse Public License is available at
+# http://www.eclipse.org/legal/epl-v10.html
+#  and the Eclipse Distribution License is available at
+#http://www.eclipse.org/org/documents/edl-v10.php.
+#
+#  Contributors:
+# Rainer Poisel - initial version
+# Genis Riera Perez - Add support for building debian package

+#***/
+
+# Note: on OS X you should install XCode and the associated command-line 
tools
+
+CMAKE_MINIMUM_REQUIRED(VERSION 2.8.4)
+PROJECT("paho" C)
+MESSAGE(STATUS "CMake version: " ${CMAKE_VERSION})
+MESSAGE(STATUS "CMake system name: " ${CMAKE_SYSTEM_NAME})
+
+SET(CMAKE_SCRIPTS "${CMAKE_SOURCE_DIR}/cmake")
+SET(CMAKE_MODULE_PATH "${CMAKE_SOURCE_DIR}/cmake/modules")
+
+## build settings
+SET(PAHO_VERSION_MAJOR 1)
+SET(PAHO_VERSION_MINOR 2)
+SET(PAHO_VERSION_PATCH 0)
+SET(CLIENT_VERSION 
${PAHO_VERSION_MAJOR}.${PAHO_VERSION_MINOR}.${PAHO_VERSION_PATCH})
+
+INCLUDE(GNUInstallDirs)
+
+STRING(TIMESTAMP BUILD_TIMESTAMP UTC)
+MESSAGE(STATUS "Timestamp is ${BUILD_TIMESTAMP}")
+
+IF(WIN32)
+  ADD_DEFINITIONS(-D_CRT_SECURE_NO_DEPRECATE -DWIN32_LEAN_AND_MEAN -MD)
+ELSEIF(${CMAKE_SYSTEM_NAME} STREQUAL "Darwin")
+  ADD_DEFINITIONS(-DOSX)
+ENDIF()
+
+## build options
+SET(PAHO_WITH_SSL TRUE CACHE BOOL "Flag that defines whether to build 
ssl-enabled binaries too. ")
+SET(PAHO_BUILD_STATIC FALSE CACHE BOOL "Build static library")
+SET(PAHO_BUILD_DOCUMENTATION FALSE CACHE BOOL "Create and install the HTML 
based API documentation (requires Doxygen)")
+SET(PAHO_BUILD_SAMPLES FALSE CACHE BOOL "Build sample programs")
+SET(PAHO_BUILD_DEB_PACKAGE FALSE CACHE BOOL "Build debian package")
+SET(PAHO_ENABLE_TESTING FALSE CACHE BOOL "Build tests and run")
+
+ADD_SUBDIRECTORY(src)
+IF(PAHO_BUILD_SAMPLES)
+ADD_SUBDIRECTORY(src/samples)
+ENDIF()
+
+IF(PAHO_BUILD_DOCUMENTATION)
+ADD_SUBDIRECTORY(doc)
+ENDIF()
+
+### packaging settings
+IF (WIN32)
+SET(CPACK_GENERATOR "ZIP")
+ELSEIF(PAHO_BUILD_DEB_PACKAGE)
+SET(CPACK_GENERATOR "DEB")
+CONFIGURE_FILE(${CMAKE_SCRIPTS}/CPackDebConfig.cmake.in
+${CMAKE_BINARY_DIR}/CPackDebConfig.cmake @ONLY)
+SET(CPACK_PROJECT_CONFIG_FILE ${CMAKE_BINARY_DIR}/CPackDebConfig.cmake)
+ADD_SUBDIRECTORY(debian)
+ELSE()
+SET(CPACK_GENERATOR "TGZ")
+ENDIF()
+
+SET(CPACK_PACKAGE_VERSION_MAJOR ${PAHO_VERSION_MAJOR})
+SET(CPACK_PACKAGE_VERSION_MINOR ${PAHO_VERSION_MINOR})
+SET(CPACK_PACKAGE_VERSION_PATCH ${PAHO_VERSION_PATCH})
+INCLUDE(CPack)
+
+IF(PAHO_ENABLE_TESTING)
--- End diff --

probably don't need tests, right?


> MQTT framework
> --
>
> Key: MINIFICPP-342
> URL: https://issues.apache.org/jira/browse/MINIFICPP-342
> Project: NiFi MiNiFi C++
>  Issue Type: New Feature
>Affects Versions: 0.3.0
>Reporter: bqiu
>Assignee: bqiu
> Fix For: 0.3.0
>
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (MINIFICPP-342) MQTT framework

2018-01-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/MINIFICPP-342?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16312325#comment-16312325
 ] 

ASF GitHub Bot commented on MINIFICPP-342:
--

Github user phrocker commented on a diff in the pull request:

https://github.com/apache/nifi-minifi-cpp/pull/228#discussion_r159794068
  
--- Diff: extensions/mqtt/ConsumeMQTT.h ---
@@ -0,0 +1,125 @@
+/**
+ * @file ConsumeMQTT.h
+ * ConsumeMQTT class declaration
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef __CONSUME_MQTT_H__
+#define __CONSUME_MQTT_H__
+
+#include 
+#include 
+#include "FlowFileRecord.h"
+#include "core/Processor.h"
+#include "core/ProcessSession.h"
+#include "core/Core.h"
+#include "core/Resource.h"
+#include "core/Property.h"
+#include "core/logging/LoggerConfiguration.h"
+#include "MQTTClient.h"
+#include "AbstractMQTTProcessor.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+
+#define MQTT_TOPIC_ATTRIBUTE "mqtt.topic"
+#define MQTT_BROKER_ATTRIBUTE "mqtt.broker"
+
+// ConsumeMQTT Class
+class ConsumeMQTT: public processors::AbstractMQTTProcessor {
+public:
+  // Constructor
+  /*!
+   * Create a new processor
+   */
+  explicit ConsumeMQTT(std::string name, uuid_t uuid = NULL)
+: processors::AbstractMQTTProcessor(name, uuid), 
logger_(logging::LoggerFactory::getLogger()) {
+isSubscriber_ = true;
+maxQueueSize_ = 100;
+  }
+  // Destructor
+  virtual ~ConsumeMQTT() {
+std::lock_guard < std::mutex > lock(mutex_);
+while (!queue_.empty()) {
+  MQTTClient_message *message = queue_.front();
+  MQTTClient_freeMessage();
+  queue_.pop_front();
+}
+  }
+  // Processor Name
+  static constexpr char const* ProcessorName = "ConsumeMQTT";
+  // Supported Properties
+  static core::Property MaxQueueSize;
+  // Nest Callback Class for write stream
+  class WriteCallback: public OutputStreamCallback {
+  public:
+WriteCallback(MQTTClient_message *message) :
+  message_(message) {
+  status_ = 0;
+}
+MQTTClient_message *message_;
+int64_t process(std::shared_ptr stream) {
+  int64_t len = 
stream->write(reinterpret_cast(message_->payload), 
message_->payloadlen);
+  if (len < 0)
+status_ = -1;
+  return len;
+}
+int status_;
+  };
+
+public:
+  /**
+   * Function that's executed when the processor is scheduled.
+   * @param context process context.
+   * @param sessionFactory process session factory that is used when 
creating
+   * ProcessSession objects.
+   */
+  void onSchedule(core::ProcessContext *context, 
core::ProcessSessionFactory *sessionFactory);
+  // OnTrigger method, implemented by NiFi ConsumeMQTT
+  virtual void onTrigger(const std::shared_ptr 
, const std::shared_ptr );
+  // Initialize, over write by NiFi ConsumeMQTT
+  virtual void initialize(void);
+  virtual bool enqueueReceiveMQTTMsg(MQTTClient_message *message);
+
+protected:
+  void getReceivedMQTTMsg(std::deque _queue) {
+std::lock_guard < std::mutex > lock(mutex_);
--- End diff --

Could you use a lock free queue here? might not save much but may increase 
throughput and we have one in our code base already.


> MQTT framework
> --
>
> Key: MINIFICPP-342
> URL: https://issues.apache.org/jira/browse/MINIFICPP-342
> Project: NiFi MiNiFi C++
>  Issue Type: New Feature
>Affects Versions: 0.3.0
>Reporter: bqiu
>Assignee: bqiu
> Fix For: 0.3.0
>
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (MINIFICPP-342) MQTT framework

2018-01-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/MINIFICPP-342?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16312323#comment-16312323
 ] 

ASF GitHub Bot commented on MINIFICPP-342:
--

Github user phrocker commented on a diff in the pull request:

https://github.com/apache/nifi-minifi-cpp/pull/228#discussion_r159794354
  
--- Diff: extensions/mqtt/PublishMQTT.h ---
@@ -0,0 +1,142 @@
+/**
+ * @file PublishMQTT.h
+ * PublishMQTT class declaration
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef __PUBLISH_MQTT_H__
+#define __PUBLISH_MQTT_H__
+
+#include "FlowFileRecord.h"
+#include "core/Processor.h"
+#include "core/ProcessSession.h"
+#include "core/Core.h"
+#include "core/Resource.h"
+#include "core/Property.h"
+#include "core/logging/LoggerConfiguration.h"
+#include "MQTTClient.h"
+#include "AbstractMQTTProcessor.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+
+// PublishMQTT Class
+class PublishMQTT: public processors::AbstractMQTTProcessor {
+public:
+  // Constructor
+  /*!
+   * Create a new processor
+   */
+  explicit PublishMQTT(std::string name, uuid_t uuid = NULL)
+: processors::AbstractMQTTProcessor(name, uuid), 
logger_(logging::LoggerFactory::getLogger()) {
+retain_ = false;
+max_seg_size_ = ULLONG_MAX;
+  }
+  // Destructor
+  virtual ~PublishMQTT() {
+  }
+  // Processor Name
+  static constexpr char const* ProcessorName = "PublishMQTT";
+  // Supported Properties
+  static core::Property Retain;
+  static core::Property MaxFlowSegSize;
+
+  // Nest Callback Class for read stream
+  class ReadCallback: public InputStreamCallback {
+  public:
+ReadCallback(uint64_t flow_size, uint64_t max_seg_size, const 
std::string , MQTTClient client,
+int qos, bool retain, MQTTClient_deliveryToken ) :
+flow_size_(flow_size), max_seg_size_(max_seg_size), key_(key), 
client_(client),
+qos_(qos), retain_(retain), token_(token) {
+  status_ = 0;
+  read_size_ = 0;
+}
+~ReadCallback() {
+}
+int64_t process(std::shared_ptr stream) {
+  if (flow_size_ < max_seg_size_)
+max_seg_size_ = flow_size_;
+  std::vector buffer;
+  buffer.reserve(max_seg_size_);
+  read_size_ = 0;
+  status_ = 0;
+  while (read_size_ < flow_size_) {
+int readRet = stream->read([0], max_seg_size_);
+if (readRet < 0) {
+  status_ = -1;
+  return read_size_;
+}
+if (readRet > 0) {
+  MQTTClient_message pubmsg = MQTTClient_message_initializer;
+  pubmsg.payload = [0];
+  pubmsg.payloadlen = readRet;
+  pubmsg.qos = qos_;
+  pubmsg.retained = retain_;
+  if (MQTTClient_publishMessage(client_, key_.c_str(), , 
_) != MQTTCLIENT_SUCCESS) {
--- End diff --

does the publish copy the buffer? If not, does it finish entirely or will 
it create a callback ? I ask because you're passing in the buffer as the 
payload, but if there is a callback, we could possibly have memory that's freed 
when this function exits with the MQTTClient still in progress. 


> MQTT framework
> --
>
> Key: MINIFICPP-342
> URL: https://issues.apache.org/jira/browse/MINIFICPP-342
> Project: NiFi MiNiFi C++
>  Issue Type: New Feature
>Affects Versions: 0.3.0
>Reporter: bqiu
>Assignee: bqiu
> Fix For: 0.3.0
>
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (MINIFICPP-342) MQTT framework

2018-01-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/MINIFICPP-342?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16311654#comment-16311654
 ] 

ASF GitHub Bot commented on MINIFICPP-342:
--

GitHub user minifirocks opened a pull request:

https://github.com/apache/nifi-minifi-cpp/pull/228

MINIFICPP-342: MQTT extension

Thank you for submitting a contribution to Apache NiFi - MiNiFi C++.

In order to streamline the review of the contribution we ask you
to ensure the following steps have been taken:

### For all changes:
- [ ] Is there a JIRA ticket associated with this PR? Is it referenced
 in the commit message?

- [ ] Does your PR title start with MINIFI- where  is the JIRA 
number you are trying to resolve? Pay particular attention to the hyphen "-" 
character.

- [ ] Has your PR been rebased against the latest commit within the target 
branch (typically master)?

- [ ] Is your initial contribution a single, squashed commit?

### For code changes:
- [ ] If adding new dependencies to the code, are these dependencies 
licensed in a way that is compatible for inclusion under [ASF 
2.0](http://www.apache.org/legal/resolved.html#category-a)?
- [ ] If applicable, have you updated the LICENSE file?
- [ ] If applicable, have you updated the NOTICE file?

### For documentation related changes:
- [ ] Have you ensured that format looks appropriate for the output in 
which it is rendered?

### Note:
Please ensure that once the PR is submitted, you check travis-ci for build 
issues and submit an update to your PR as soon as possible.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/minifirocks/nifi-minifi-cpp mqtt_dev

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/nifi-minifi-cpp/pull/228.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #228


commit cc47483e04164102b2382a78bf68d10f4e8f5efe
Author: Bin Qiu 
Date:   2018-01-04T16:23:14Z

MINIFICPP-342: MQTT extension




> MQTT framework
> --
>
> Key: MINIFICPP-342
> URL: https://issues.apache.org/jira/browse/MINIFICPP-342
> Project: NiFi MiNiFi C++
>  Issue Type: New Feature
>Affects Versions: 0.3.0
>Reporter: bqiu
>Assignee: bqiu
> Fix For: 0.3.0
>
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)