[GitHub] [incubator-iotdb] vesense commented on a change in pull request #929: [IOTDB-565] MQTT Protocol Support

2020-04-06 Thread GitBox
vesense commented on a change in pull request #929: [IOTDB-565] MQTT Protocol 
Support
URL: https://github.com/apache/incubator-iotdb/pull/929#discussion_r404476310
 
 

 ##
 File path: server/src/main/java/io/moquette/broker/MQTTConnection.java
 ##
 @@ -0,0 +1,503 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.moquette.broker;
+
+import io.moquette.broker.subscriptions.Topic;
+import io.moquette.broker.security.IAuthenticator;
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelPipeline;
+import io.netty.handler.codec.mqtt.*;
+import io.netty.handler.timeout.IdleStateHandler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.InetSocketAddress;
+import java.nio.charset.StandardCharsets;
+import java.util.*;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static io.netty.channel.ChannelFutureListener.CLOSE_ON_FAILURE;
+import static io.netty.channel.ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE;
+import static io.netty.handler.codec.mqtt.MqttConnectReturnCode.*;
+import static io.netty.handler.codec.mqtt.MqttMessageIdVariableHeader.from;
+import static io.netty.handler.codec.mqtt.MqttQoS.*;
+
+// NOTE:
+// override the MQTTConnection class in the moquette 0.12.1 jar to fix the 
PUBACK flush issue
+// https://github.com/moquette-io/moquette/pull/454
+// when moquette fixed version released, we can remove this.
+final class MQTTConnection {
 
 Review comment:
   done.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [incubator-iotdb] vesense commented on a change in pull request #929: [IOTDB-565] MQTT Protocol Support

2020-04-06 Thread GitBox
vesense commented on a change in pull request #929: [IOTDB-565] MQTT Protocol 
Support
URL: https://github.com/apache/incubator-iotdb/pull/929#discussion_r404474266
 
 

 ##
 File path: server/src/main/java/io/moquette/broker/MQTTConnection.java
 ##
 @@ -0,0 +1,503 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.moquette.broker;
+
+import io.moquette.broker.subscriptions.Topic;
+import io.moquette.broker.security.IAuthenticator;
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelPipeline;
+import io.netty.handler.codec.mqtt.*;
+import io.netty.handler.timeout.IdleStateHandler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.InetSocketAddress;
+import java.nio.charset.StandardCharsets;
+import java.util.*;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static io.netty.channel.ChannelFutureListener.CLOSE_ON_FAILURE;
+import static io.netty.channel.ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE;
+import static io.netty.handler.codec.mqtt.MqttConnectReturnCode.*;
+import static io.netty.handler.codec.mqtt.MqttMessageIdVariableHeader.from;
+import static io.netty.handler.codec.mqtt.MqttQoS.*;
+
+// NOTE:
+// override the MQTTConnection class in the moquette 0.12.1 jar to fix the 
PUBACK flush issue
+// https://github.com/moquette-io/moquette/pull/454
+// when moquette fixed version released, we can remove this.
+final class MQTTConnection {
 
 Review comment:
   OK


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [incubator-iotdb] vesense commented on a change in pull request #929: [IOTDB-565] MQTT Protocol Support

2020-04-06 Thread GitBox
vesense commented on a change in pull request #929: [IOTDB-565] MQTT Protocol 
Support
URL: https://github.com/apache/incubator-iotdb/pull/929#discussion_r404071694
 
 

 ##
 File path: example/mqtt/src/main/java/org/apache/iotdb/mqtt/MQTTClient.java
 ##
 @@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.iotdb.mqtt;
+
+import org.fusesource.mqtt.client.BlockingConnection;
+import org.fusesource.mqtt.client.MQTT;
+import org.fusesource.mqtt.client.QoS;
+
+import java.util.Random;
+
+public class MQTTClient {
+public static void main(String[] args) throws Exception {
+MQTT mqtt = new MQTT();
+mqtt.setHost("127.0.0.1", 1883);
+mqtt.setUserName("root");
+mqtt.setPassword("root");
+
+BlockingConnection connection = mqtt.blockingConnection();
+connection.connect();
+
+Random random = new Random();
+for (int i = 0; i < 10; i++) {
+String payload = String.format("{\n" +
+"\"device\":\"root.sg.d1\",\n" +
+"\"timestamp\":%d,\n" +
+"\"measurements\":[\"s1\"],\n" +
+"\"values\":[%f]\n" +
+"}", System.currentTimeMillis(), random.nextDouble());
+
+connection.publish("root.sg.d1.s1", payload.getBytes(), 
QoS.AT_LEAST_ONCE, false);
+
+Thread.sleep(1000);
 
 Review comment:
   done.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [incubator-iotdb] vesense commented on a change in pull request #929: [IOTDB-565] MQTT Protocol Support

2020-04-06 Thread GitBox
vesense commented on a change in pull request #929: [IOTDB-565] MQTT Protocol 
Support
URL: https://github.com/apache/incubator-iotdb/pull/929#discussion_r404070226
 
 

 ##
 File path: 
server/src/main/resources/META-INF/services/org.apache.iotdb.db.mqtt.PayloadFormatter
 ##
 @@ -0,0 +1,20 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+org.apache.iotdb.db.mqtt.JSONPayloadFormatter
 
 Review comment:
   Also added unit tests and docs.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [incubator-iotdb] vesense commented on a change in pull request #929: [IOTDB-565] MQTT Protocol Support

2020-04-06 Thread GitBox
vesense commented on a change in pull request #929: [IOTDB-565] MQTT Protocol 
Support
URL: https://github.com/apache/incubator-iotdb/pull/929#discussion_r404069815
 
 

 ##
 File path: 
server/src/main/resources/META-INF/services/org.apache.iotdb.db.mqtt.PayloadFormatter
 ##
 @@ -0,0 +1,20 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+org.apache.iotdb.db.mqtt.JSONPayloadFormatter
 
 Review comment:
   I updated the code to make default `json` formatter support both two json 
format:
   ```json
{
 "device":"root.sg.d1",
 "timestamp":1586076045524,
 "measurements":["s1","s2"],
 "values":[0.530635,0.530635]
}
   ```
   or
   ```json
   {
 "device":"root.sg.d1",
 "timestamps":[1586076045524,1586076065526],
 "measurements":["s1","s2"],
 "values":[[0.530635,0.530635], [0.530655,0.530695]]
 }
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [incubator-iotdb] vesense commented on a change in pull request #929: [IOTDB-565] MQTT Protocol Support

2020-04-06 Thread GitBox
vesense commented on a change in pull request #929: [IOTDB-565] MQTT Protocol 
Support
URL: https://github.com/apache/incubator-iotdb/pull/929#discussion_r404068469
 
 

 ##
 File path: example/mqtt/src/main/java/org/apache/iotdb/mqtt/MQTTClient.java
 ##
 @@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.iotdb.mqtt;
+
+import org.fusesource.mqtt.client.BlockingConnection;
+import org.fusesource.mqtt.client.MQTT;
+import org.fusesource.mqtt.client.QoS;
+
+import java.util.Random;
+
+public class MQTTClient {
+public static void main(String[] args) throws Exception {
+MQTT mqtt = new MQTT();
+mqtt.setHost("127.0.0.1", 1883);
+mqtt.setUserName("root");
+mqtt.setPassword("root");
+
+BlockingConnection connection = mqtt.blockingConnection();
+connection.connect();
+
+Random random = new Random();
+for (int i = 0; i < 10; i++) {
+String payload = String.format("{\n" +
+"\"device\":\"root.sg.d1\",\n" +
+"\"timestamp\":%d,\n" +
+"\"measurements\":[\"s1\"],\n" +
+"\"values\":[%f]\n" +
+"}", System.currentTimeMillis(), random.nextDouble());
+
+connection.publish("root.sg.d1.s1", payload.getBytes(), 
QoS.AT_LEAST_ONCE, false);
+
+Thread.sleep(1000);
 
 Review comment:
   OK


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [incubator-iotdb] vesense commented on a change in pull request #929: [IOTDB-565] MQTT Protocol Support

2020-04-05 Thread GitBox
vesense commented on a change in pull request #929: [IOTDB-565] MQTT Protocol 
Support
URL: https://github.com/apache/incubator-iotdb/pull/929#discussion_r403804449
 
 

 ##
 File path: example/mqtt/src/main/java/org/apache/iotdb/mqtt/MQTTClient.java
 ##
 @@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.iotdb.mqtt;
+
+import org.fusesource.mqtt.client.BlockingConnection;
+import org.fusesource.mqtt.client.MQTT;
+import org.fusesource.mqtt.client.QoS;
+
+import java.util.Random;
+
+public class MQTTClient {
+public static void main(String[] args) throws Exception {
+MQTT mqtt = new MQTT();
+mqtt.setHost("127.0.0.1", 1883);
+mqtt.setUserName("root");
+mqtt.setPassword("root");
+
+BlockingConnection connection = mqtt.blockingConnection();
+connection.connect();
+
+Random random = new Random();
+for (int i = 0; i < 10; i++) {
+String payload = String.format("{\n" +
+"\"device\":\"root.sg.d1\",\n" +
+"\"timestamp\":%d,\n" +
+"\"measurements\":[\"s1\"],\n" +
+"\"values\":[%f]\n" +
+"}", System.currentTimeMillis(), random.nextDouble());
+
+connection.publish("root.sg.d1.s1", payload.getBytes(), 
QoS.AT_LEAST_ONCE, false);
+
+Thread.sleep(1000);
 
 Review comment:
   This is a known PUBACK flush issue for moquette when the QoS is 
AT_LEAST_ONCE and EXACTLY_ONCE, this issue has fixed but not released. 
https://github.com/moquette-io/moquette/issues/515
   Don't worry, I have fixed that when used in the  IoTDB MQTT service, just 
try again.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [incubator-iotdb] vesense commented on a change in pull request #929: [IOTDB-565] MQTT Protocol Support

2020-04-05 Thread GitBox
vesense commented on a change in pull request #929: [IOTDB-565] MQTT Protocol 
Support
URL: https://github.com/apache/incubator-iotdb/pull/929#discussion_r403804539
 
 

 ##
 File path: 
server/src/main/resources/META-INF/services/org.apache.iotdb.db.mqtt.PayloadFormatter
 ##
 @@ -0,0 +1,20 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+org.apache.iotdb.db.mqtt.JSONPayloadFormatter
 
 Review comment:
   Users can implement `PayloadFormatter` interface and change the config 
`mqtt_payload_formatter` if the default json formatter `JSONPayloadFormatter` 
don't match the need.
   
   For the scenario you stated, user might implement a 
`BatchJSONPayloadFormatter` and named `batchjson`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [incubator-iotdb] vesense commented on a change in pull request #929: [IOTDB-565] MQTT Protocol Support

2020-04-05 Thread GitBox
vesense commented on a change in pull request #929: [IOTDB-565] MQTT Protocol 
Support
URL: https://github.com/apache/incubator-iotdb/pull/929#discussion_r403716319
 
 

 ##
 File path: docs/UserGuide/3-Server/6-MQTT Protocol.md
 ##
 @@ -0,0 +1,91 @@
+
+# MQTT Protocol
 
 Review comment:
   Make sense. just like JDBC or Session API. Will update.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [incubator-iotdb] vesense commented on a change in pull request #929: [IOTDB-565] MQTT Protocol Support

2020-04-05 Thread GitBox
vesense commented on a change in pull request #929: [IOTDB-565] MQTT Protocol 
Support
URL: https://github.com/apache/incubator-iotdb/pull/929#discussion_r403716319
 
 

 ##
 File path: docs/UserGuide/3-Server/6-MQTT Protocol.md
 ##
 @@ -0,0 +1,91 @@
+
+# MQTT Protocol
 
 Review comment:
   Make sense. just like JDBC or Session API. Will udpate.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [incubator-iotdb] vesense commented on a change in pull request #929: [IOTDB-565] MQTT Protocol Support

2020-04-05 Thread GitBox
vesense commented on a change in pull request #929: [IOTDB-565] MQTT Protocol 
Support
URL: https://github.com/apache/incubator-iotdb/pull/929#discussion_r403670885
 
 

 ##
 File path: docs/UserGuide/3-Server/6-MQTT Protocol.md
 ##
 @@ -0,0 +1,82 @@
+
+# MQTT Protocol
+
+[MQTT](http://mqtt.org/) is a machine-to-machine (M2M)/"Internet of Things" 
connectivity protocol.
+It was designed as an extremely lightweight publish/subscribe messaging 
transport.
+It is useful for connections with remote locations where a small code 
footprint is required and/or network bandwidth is at a premium.
+
+IoTDB supports the MQTT v3.1(an OASIS Standard) protocol.
+IoTDB server includes a built-in MQTT service that allows remote devices send 
messages into IoTDB server directly.
+
+https://user-images.githubusercontent.com/6711230/78357432-0c71cf80-75e4-11ea-98aa-c43a54d469ce.png;>
+
+
+## Built-in MQTT Service
+The Built-in MQTT Service provide the ability of direct connection to IoTDB 
through MQTT. It listen the publish messages from MQTT clients
+ and then write the data into storage immediately. 
+The messages payload can be format to events by `PayloadFormatter` which 
loaded by java SPI, and the default implementation is `JSONPayloadFormatter`.
+
+https://user-images.githubusercontent.com/6711230/78357469-1bf11880-75e4-11ea-978f-a53996667a0d.png;>
+
+## MQTT Configurations
+The IoTDB MQTT service load configurations from 
`${IOTDB_HOME}/${IOTDB_CONF}/iotdb-engine.properties` by default.
+
+Configurations are as following:
+
+| NAME| DESCRIPTION   | DEFAULT  |
+| - |:-:|:--:|
+| enable_mqtt_service  | whether to enable the mqtt service | true |
+| mqtt_host  | the mqtt service binding host | 0.0.0.0 |
+| mqtt_port  | the mqtt service binding port|   1883 |
+| mqtt_handler_pool_size | the handler pool size for handing the mqtt messages 
 |1 |
+| mqtt_payload_formatter | the mqtt message payload formatter |json |
+
+
+## Examples
+The following is an example which a mqtt client send messages to IoTDB server.
+
+ ```java
+MQTT mqtt = new MQTT();
+mqtt.setHost("127.0.0.1", 1883);
+mqtt.setUserName("root");
+mqtt.setPassword("root");
+
+BlockingConnection connection = mqtt.blockingConnection();
+connection.connect();
+
+Random random = new Random();
+for (int i = 0; i < 10; i++) {
+Map tuple = new HashMap();
+tuple.put("device", "root.sg.d1");
+tuple.put("timestamp", System.currentTimeMillis());
+tuple.put("measurements", "s1");
+tuple.put("values", random.nextDouble());
+
+String payload = JSON.toJSONString(tuple);
+connection.publish("root.sg.d1.s1", payload.getBytes(), 
QoS.AT_LEAST_ONCE, false);
 
 Review comment:
   I added some documents to make it clear.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [incubator-iotdb] vesense commented on a change in pull request #929: [IOTDB-565] MQTT Protocol Support

2020-04-04 Thread GitBox
vesense commented on a change in pull request #929: [IOTDB-565] MQTT Protocol 
Support
URL: https://github.com/apache/incubator-iotdb/pull/929#discussion_r403645959
 
 

 ##
 File path: docs/UserGuide/3-Server/6-MQTT Protocol.md
 ##
 @@ -0,0 +1,82 @@
+
+# MQTT Protocol
+
+[MQTT](http://mqtt.org/) is a machine-to-machine (M2M)/"Internet of Things" 
connectivity protocol.
+It was designed as an extremely lightweight publish/subscribe messaging 
transport.
+It is useful for connections with remote locations where a small code 
footprint is required and/or network bandwidth is at a premium.
+
+IoTDB supports the MQTT v3.1(an OASIS Standard) protocol.
+IoTDB server includes a built-in MQTT service that allows remote devices send 
messages into IoTDB server directly.
+
+https://user-images.githubusercontent.com/6711230/78357432-0c71cf80-75e4-11ea-98aa-c43a54d469ce.png;>
+
+
+## Built-in MQTT Service
+The Built-in MQTT Service provide the ability of direct connection to IoTDB 
through MQTT. It listen the publish messages from MQTT clients
+ and then write the data into storage immediately. 
+The messages payload can be format to events by `PayloadFormatter` which 
loaded by java SPI, and the default implementation is `JSONPayloadFormatter`.
+
+https://user-images.githubusercontent.com/6711230/78357469-1bf11880-75e4-11ea-978f-a53996667a0d.png;>
+
+## MQTT Configurations
+The IoTDB MQTT service load configurations from 
`${IOTDB_HOME}/${IOTDB_CONF}/iotdb-engine.properties` by default.
+
+Configurations are as following:
+
+| NAME| DESCRIPTION   | DEFAULT  |
+| - |:-:|:--:|
+| enable_mqtt_service  | whether to enable the mqtt service | true |
+| mqtt_host  | the mqtt service binding host | 0.0.0.0 |
+| mqtt_port  | the mqtt service binding port|   1883 |
+| mqtt_handler_pool_size | the handler pool size for handing the mqtt messages 
 |1 |
+| mqtt_payload_formatter | the mqtt message payload formatter |json |
+
+
+## Examples
+The following is an example which a mqtt client send messages to IoTDB server.
+
+ ```java
+MQTT mqtt = new MQTT();
+mqtt.setHost("127.0.0.1", 1883);
+mqtt.setUserName("root");
+mqtt.setPassword("root");
+
+BlockingConnection connection = mqtt.blockingConnection();
+connection.connect();
+
+Random random = new Random();
+for (int i = 0; i < 10; i++) {
+Map tuple = new HashMap();
+tuple.put("device", "root.sg.d1");
+tuple.put("timestamp", System.currentTimeMillis());
+tuple.put("measurements", "s1");
+tuple.put("values", random.nextDouble());
+
+String payload = JSON.toJSONString(tuple);
+connection.publish("root.sg.d1.s1", payload.getBytes(), 
QoS.AT_LEAST_ONCE, false);
 
 Review comment:
   The first parameter isn't clientID. it's the mqtt topic which is 
corresponding to the IoTDB timeseries.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [incubator-iotdb] vesense commented on a change in pull request #929: [IOTDB-565] MQTT Protocol Support

2020-04-04 Thread GitBox
vesense commented on a change in pull request #929: [IOTDB-565] MQTT Protocol 
Support
URL: https://github.com/apache/incubator-iotdb/pull/929#discussion_r403645691
 
 

 ##
 File path: docs/UserGuide/3-Server/6-MQTT Protocol.md
 ##
 @@ -0,0 +1,82 @@
+
+# MQTT Protocol
+
+[MQTT](http://mqtt.org/) is a machine-to-machine (M2M)/"Internet of Things" 
connectivity protocol.
+It was designed as an extremely lightweight publish/subscribe messaging 
transport.
+It is useful for connections with remote locations where a small code 
footprint is required and/or network bandwidth is at a premium.
+
+IoTDB supports the MQTT v3.1(an OASIS Standard) protocol.
+IoTDB server includes a built-in MQTT service that allows remote devices send 
messages into IoTDB server directly.
+
+https://user-images.githubusercontent.com/6711230/78357432-0c71cf80-75e4-11ea-98aa-c43a54d469ce.png;>
+
+
+## Built-in MQTT Service
+The Built-in MQTT Service provide the ability of direct connection to IoTDB 
through MQTT. It listen the publish messages from MQTT clients
+ and then write the data into storage immediately. 
+The messages payload can be format to events by `PayloadFormatter` which 
loaded by java SPI, and the default implementation is `JSONPayloadFormatter`.
+
+https://user-images.githubusercontent.com/6711230/78357469-1bf11880-75e4-11ea-978f-a53996667a0d.png;>
+
+## MQTT Configurations
+The IoTDB MQTT service load configurations from 
`${IOTDB_HOME}/${IOTDB_CONF}/iotdb-engine.properties` by default.
+
+Configurations are as following:
+
+| NAME| DESCRIPTION   | DEFAULT  |
+| - |:-:|:--:|
+| enable_mqtt_service  | whether to enable the mqtt service | true |
+| mqtt_host  | the mqtt service binding host | 0.0.0.0 |
+| mqtt_port  | the mqtt service binding port|   1883 |
+| mqtt_handler_pool_size | the handler pool size for handing the mqtt messages 
 |1 |
+| mqtt_payload_formatter | the mqtt message payload formatter |json |
+
+
+## Examples
+The following is an example which a mqtt client send messages to IoTDB server.
+
+ ```java
+MQTT mqtt = new MQTT();
+mqtt.setHost("127.0.0.1", 1883);
+mqtt.setUserName("root");
+mqtt.setPassword("root");
+
+BlockingConnection connection = mqtt.blockingConnection();
+connection.connect();
+
+Random random = new Random();
+for (int i = 0; i < 10; i++) {
+Map tuple = new HashMap();
+tuple.put("device", "root.sg.d1");
+tuple.put("timestamp", System.currentTimeMillis());
+tuple.put("measurements", "s1");
+tuple.put("values", random.nextDouble());
+
+String payload = JSON.toJSONString(tuple);
 
 Review comment:
   Good catch, will update.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [incubator-iotdb] vesense commented on a change in pull request #929: [IOTDB-565] MQTT Protocol Support

2020-04-04 Thread GitBox
vesense commented on a change in pull request #929: [IOTDB-565] MQTT Protocol 
Support
URL: https://github.com/apache/incubator-iotdb/pull/929#discussion_r403645652
 
 

 ##
 File path: server/src/main/java/org/apache/iotdb/db/mqtt/PublishHandler.java
 ##
 @@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.iotdb.db.mqtt;
+
+import io.moquette.interception.AbstractInterceptHandler;
+import io.moquette.interception.messages.InterceptPublishMessage;
+import io.netty.buffer.ByteBuf;
+import io.netty.handler.codec.mqtt.MqttQoS;
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.db.qp.executor.IPlanExecutor;
+import org.apache.iotdb.db.qp.executor.PlanExecutor;
+import org.apache.iotdb.db.qp.physical.PhysicalPlan;
+import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * PublishHandler handle the messages from MQTT clients.
+ */
+public class PublishHandler extends AbstractInterceptHandler {
+private static final Logger LOG = 
LoggerFactory.getLogger(PublishHandler.class);
+
+private IPlanExecutor executor;
+private PayloadFormatter payloadFormat;
+
+public PublishHandler(IoTDBConfig config) {
+this.payloadFormat = 
PayloadFormatManager.getPayloadFormat(config.getMqttPayloadFormatter());
+try {
+this.executor = new PlanExecutor();
+} catch (QueryProcessException e) {
+throw new RuntimeException(e);
+}
+}
+
+protected PublishHandler(IPlanExecutor executor, PayloadFormatter 
payloadFormat) {
+this.executor = executor;
+this.payloadFormat = payloadFormat;
+}
+
+@Override
+public String getID() {
+return "iotdb-mqtt-broker-listener";
+}
+
+@Override
+public void onPublish(InterceptPublishMessage msg) {
+String clientId = msg.getClientID();
+ByteBuf payload = msg.getPayload();
+String topic = msg.getTopicName();
+String username = msg.getUsername();
+MqttQoS qos = msg.getQos();
+
+LOG.debug("Receive publish message. clientId: {}, username: {}, qos: 
{}, topic: {}, payload: {}",
+clientId, username, qos, topic, payload);
+
+Message event = payloadFormat.format(payload);
+if (event == null) {
+return;
+}
+
+InsertPlan plan = new InsertPlan();
+plan.setDeviceId(event.getDevice());
+plan.setTime(event.getTimestamp());
+plan.setMeasurements(event.getMeasurements().toArray(new 
String[event.getMeasurements().size()]));
+plan.setValues(event.getValues().toArray(new 
String[event.getValues().size()]));
+
+boolean status;
+try {
+status = executeNonQuery(plan);
+} catch (QueryProcessException e) {
+throw new RuntimeException(e);
+}
+
+LOG.debug("event process result: {}", status);
+}
+
+private boolean executeNonQuery(PhysicalPlan plan) throws 
QueryProcessException {
+if (IoTDBDescriptor.getInstance().getConfig().isReadOnly()) {
+throw new QueryProcessException(
+"Current system mode is read-only, does not support 
non-query operation");
 
 Review comment:
   why?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [incubator-iotdb] vesense commented on a change in pull request #929: [IOTDB-565] MQTT Protocol Support

2020-03-22 Thread GitBox
vesense commented on a change in pull request #929: [IOTDB-565] MQTT Protocol 
Support
URL: https://github.com/apache/incubator-iotdb/pull/929#discussion_r396087219
 
 

 ##
 File path: mqtt/src/main/java/org/apache/iotdb/mqtt/PublishHandler.java
 ##
 @@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.iotdb.mqtt;
+
+import io.moquette.interception.AbstractInterceptHandler;
+import io.moquette.interception.messages.InterceptPublishMessage;
+import io.netty.buffer.ByteBuf;
+import io.netty.handler.codec.mqtt.MqttQoS;
+import org.apache.iotdb.service.rpc.thrift.TSStatus;
+import org.apache.iotdb.session.IoTDBSessionException;
+import org.apache.iotdb.session.Session;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * PublishHandler handle the messages from MQTT clients.
+ */
+public class PublishHandler extends AbstractInterceptHandler {
+private static final Logger LOG = 
LoggerFactory.getLogger(PublishHandler.class);
+
+private Session session;
+private PayloadFormatter payloadFormat;
+static boolean testing = false;
+
+public PublishHandler(MQTTBrokerConfig config) {
+payloadFormat = 
PayloadFormatManager.getPayloadFormat(config.getPayloadFormatter());
+initSession(config);
+}
+
+public void initSession(MQTTBrokerConfig config) {
+if (testing) {
+return;
+}
+session = new Session(config.getIotDBHost(), config.getIotDBPort(),
+config.getIotDBUsername(), config.getIotDBPassword());
+try {
+session.open();
+} catch (IoTDBSessionException e) {
 
 Review comment:
   Updated.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [incubator-iotdb] vesense commented on a change in pull request #929: [IOTDB-565] MQTT Protocol Support

2020-03-21 Thread GitBox
vesense commented on a change in pull request #929: [IOTDB-565] MQTT Protocol 
Support
URL: https://github.com/apache/incubator-iotdb/pull/929#discussion_r395989324
 
 

 ##
 File path: pom.xml
 ##
 @@ -86,6 +87,7 @@
 3.8.1
 1.1.3
 21.0
+1.2.31
 
 Review comment:
   Good catch.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services