jt2594838 commented on code in PR #16674:
URL: https://github.com/apache/iotdb/pull/16674#discussion_r2485162308


##########
example/mqtt-customize/src/main/java/org/apache/iotdb/mqtt/server/CustomizedJsonPayloadFormatter.java:
##########
@@ -21,38 +21,57 @@
 
 import org.apache.iotdb.db.protocol.mqtt.Message;
 import org.apache.iotdb.db.protocol.mqtt.PayloadFormatter;
-import org.apache.iotdb.db.protocol.mqtt.TreeMessage;
+import org.apache.iotdb.db.protocol.mqtt.TableMessage;
 
+import com.google.common.collect.Lists;
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import com.google.gson.JsonArray;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParseException;
 import io.netty.buffer.ByteBuf;
+import org.apache.tsfile.enums.TSDataType;
 import org.apache.tsfile.external.commons.lang3.NotImplementedException;
+import org.apache.tsfile.utils.Binary;
+import org.apache.tsfile.utils.Pair;
 
+import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
 import java.util.List;
 
+/**
+ * The Customized JSON payload formatter. one json format supported: { 
"time":1586076045523,
+ * "deviceID":"car_1", "deviceType":"new energy vehicle", "point":"velocity", 
"value":80.0 }
+ */
 public class CustomizedJsonPayloadFormatter implements PayloadFormatter {
+  private static final String JSON_KEY_TIME = "time";
+  private static final String JSON_KEY_DEVICEID = "deviceID";
+  private static final String JSON_KEY_DEVICETYPE = "deviceType";
+  private static final String JSON_KEY_POINT = "point";
+  private static final String JSON_KEY_VALUE = "value";
+  private static final Gson GSON = new GsonBuilder().create();
 
   @Override
   public List<Message> format(String topic, ByteBuf payload) {
-    // Suppose the payload is a json format
     if (payload == null) {
-      return Collections.emptyList();
+      return new ArrayList<>();
     }
-
-    // parse data from the json and generate Messages and put them into 
List<Message> ret
-    List<Message> ret = new ArrayList<>();
-    // this is just an example, so we just generate some Messages directly
-    for (int i = 0; i < 2; i++) {
-      long ts = i;
-      TreeMessage message = new TreeMessage();
-      message.setDevice("d" + i);
-      message.setTimestamp(ts);
-      message.setMeasurements(Arrays.asList("s1", "s2"));
-      message.setValues(Arrays.asList("4.0" + i, "5.0" + i));
-      ret.add(message);
+    String txt = payload.toString(StandardCharsets.UTF_8);
+    JsonElement jsonElement = GSON.fromJson(txt, JsonElement.class);
+    if (jsonElement.isJsonObject()) {
+      JsonObject jsonObject = jsonElement.getAsJsonObject();
+      return formatTableRow(topic, jsonObject);
+    } else if (jsonElement.isJsonArray()) {
+      JsonArray jsonArray = jsonElement.getAsJsonArray();
+      List<Message> messages = new ArrayList<>();
+      for (JsonElement element : jsonArray) {
+        JsonObject jsonObject = element.getAsJsonObject();
+        messages.addAll(formatTableRow(topic, jsonObject));
+      }
+      return messages;
     }
-    return ret;
+    throw new JsonParseException("payload is invalidate");

Review Comment:
   May provide the txt or the jsonElement.



##########
example/mqtt-customize/src/main/java/org/apache/iotdb/mqtt/server/CustomizedLinePayloadFormatter.java:
##########
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.mqtt.server;
+
+import org.apache.iotdb.db.protocol.mqtt.Message;
+import org.apache.iotdb.db.protocol.mqtt.PayloadFormatter;
+import org.apache.iotdb.db.protocol.mqtt.TableMessage;
+
+import io.netty.buffer.ByteBuf;
+import org.apache.commons.lang3.NotImplementedException;
+import org.apache.tsfile.enums.TSDataType;
+
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+public class CustomizedLinePayloadFormatter implements PayloadFormatter {
+
+  @Override
+  public List<Message> format(String topic, ByteBuf payload) {
+    // Suppose the payload is a line format
+    if (payload == null) {
+      return null;
+    }

Review Comment:
   Better to keep consistency between examples.



##########
example/mqtt-customize/src/main/java/org/apache/iotdb/mqtt/server/CustomizedJsonPayloadFormatter.java:
##########
@@ -61,14 +80,109 @@ public List<Message> format(ByteBuf payload) {
     throw new NotImplementedException();
   }
 
+  private List<Message> formatTableRow(String topic, JsonObject jsonObject) {
+    TableMessage message = new TableMessage();
+    String database = !topic.contains("/") ? topic : topic.substring(0, 
topic.indexOf("/"));
+    String table = "test_table";

Review Comment:
   May parse the table name from the topic?



##########
example/mqtt-customize/src/main/java/org/apache/iotdb/mqtt/server/CustomizedJsonPayloadFormatter.java:
##########
@@ -21,38 +21,57 @@
 
 import org.apache.iotdb.db.protocol.mqtt.Message;
 import org.apache.iotdb.db.protocol.mqtt.PayloadFormatter;
-import org.apache.iotdb.db.protocol.mqtt.TreeMessage;
+import org.apache.iotdb.db.protocol.mqtt.TableMessage;
 
+import com.google.common.collect.Lists;
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import com.google.gson.JsonArray;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParseException;
 import io.netty.buffer.ByteBuf;
+import org.apache.tsfile.enums.TSDataType;
 import org.apache.tsfile.external.commons.lang3.NotImplementedException;
+import org.apache.tsfile.utils.Binary;
+import org.apache.tsfile.utils.Pair;
 
+import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
 import java.util.List;
 
+/**
+ * The Customized JSON payload formatter. one json format supported: { 
"time":1586076045523,
+ * "deviceID":"car_1", "deviceType":"new energy vehicle", "point":"velocity", 
"value":80.0 }
+ */
 public class CustomizedJsonPayloadFormatter implements PayloadFormatter {
+  private static final String JSON_KEY_TIME = "time";
+  private static final String JSON_KEY_DEVICEID = "deviceID";
+  private static final String JSON_KEY_DEVICETYPE = "deviceType";
+  private static final String JSON_KEY_POINT = "point";
+  private static final String JSON_KEY_VALUE = "value";
+  private static final Gson GSON = new GsonBuilder().create();
 
   @Override
   public List<Message> format(String topic, ByteBuf payload) {
-    // Suppose the payload is a json format
     if (payload == null) {
-      return Collections.emptyList();
+      return new ArrayList<>();
     }

Review Comment:
   Explain this change



##########
example/mqtt-customize/src/main/java/org/apache/iotdb/mqtt/server/CustomizedLinePayloadFormatter.java:
##########
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.mqtt.server;
+
+import org.apache.iotdb.db.protocol.mqtt.Message;
+import org.apache.iotdb.db.protocol.mqtt.PayloadFormatter;
+import org.apache.iotdb.db.protocol.mqtt.TableMessage;
+
+import io.netty.buffer.ByteBuf;
+import org.apache.commons.lang3.NotImplementedException;
+import org.apache.tsfile.enums.TSDataType;
+
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+public class CustomizedLinePayloadFormatter implements PayloadFormatter {
+
+  @Override
+  public List<Message> format(String topic, ByteBuf payload) {
+    // Suppose the payload is a line format
+    if (payload == null) {
+      return null;
+    }
+
+    String line = payload.toString(StandardCharsets.UTF_8);
+    // parse data from the line and generate Messages and put them into 
List<Meesage> ret
+    List<Message> ret = new ArrayList<>();
+    // this is just an example, so we just generate some Messages directly
+    for (int i = 0; i < 3; i++) {

Review Comment:
   The line should still be used, otherwise, this examples means nothing.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to