OneSizeFitsQuorum commented on code in PR #12113:
URL: https://github.com/apache/iotdb/pull/12113#discussion_r1535602708
##########
iotdb-client/isession/src/main/java/org/apache/iotdb/isession/SessionConfig.java:
##########
@@ -31,6 +31,8 @@ public class SessionConfig {
public static final int DEFAULT_CONNECTION_TIMEOUT_MS = 0;
public static final boolean DEFAULT_REDIRECTION_MODE = true;
Review Comment:
remove this line
##########
iotdb-client/session/src/main/java/org/apache/iotdb/session/Session.java:
##########
@@ -1901,6 +1905,18 @@ public void insertRecords(
throw new IllegalArgumentException(
"deviceIds, times, measurementsList and valuesList's size should be
equal");
}
+
+ if (enableRecordsConvertTablet) {
+ // judge if convert records to tablets.
+ Set<String> deviceSet = new HashSet<>(deviceIds);
+
+ if ((double) deviceSet.size() / deviceIds.size() <= 0.5) {
Review Comment:
do not use magic number
##########
iotdb-client/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java:
##########
@@ -3505,6 +3510,8 @@ public static class Builder {
private boolean enableCompression = false;
private ZoneId zoneId = null;
private boolean enableRedirection = SessionConfig.DEFAULT_REDIRECTION_MODE;
+
Review Comment:
do not add empty line
##########
iotdb-client/session/src/main/java/org/apache/iotdb/session/Session.java:
##########
@@ -2737,6 +2706,143 @@ private void updateTSInsertTabletsReq(
request.addToSizeList(tablet.rowSize);
}
+ // convert records of one device to tablet and insert
+ public void convertToTabletAndInsert(
+ String deviceId,
+ List<Long> times,
+ List<List<String>> measurementsList,
+ List<List<TSDataType>> typesList,
+ List<List<Object>> valuesList,
+ boolean isAligned)
+ throws IoTDBConnectionException, StatementExecutionException {
+ // measurement -> type
+ Map<String, TSDataType> measurementType = new HashMap<>();
+ // build measurementType
+ for (int rowIndex = 0; rowIndex < measurementsList.size(); rowIndex++) {
+ List<String> measurements = measurementsList.get(rowIndex);
+ List<TSDataType> types = typesList.get(rowIndex);
+ for (int colIndex = 0; colIndex < measurements.size(); colIndex++) {
+ if (!measurementType.containsKey(measurements.get(colIndex))) {
Review Comment:
1. figure measurements.get(colIndex) once
2. use computeIfAbsent API
##########
iotdb-client/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java:
##########
@@ -112,6 +112,8 @@ public class SessionPool implements ISessionPool {
private String trustStorePwd;
private ZoneId zoneId;
private boolean enableRedirection;
+
Review Comment:
.
##########
iotdb-client/session/src/main/java/org/apache/iotdb/session/Session.java:
##########
@@ -2737,6 +2706,143 @@ private void updateTSInsertTabletsReq(
request.addToSizeList(tablet.rowSize);
}
+ // convert records of one device to tablet and insert
+ public void convertToTabletAndInsert(
+ String deviceId,
+ List<Long> times,
+ List<List<String>> measurementsList,
+ List<List<TSDataType>> typesList,
+ List<List<Object>> valuesList,
+ boolean isAligned)
+ throws IoTDBConnectionException, StatementExecutionException {
+ // measurement -> type
+ Map<String, TSDataType> measurementType = new HashMap<>();
+ // build measurementType
+ for (int rowIndex = 0; rowIndex < measurementsList.size(); rowIndex++) {
+ List<String> measurements = measurementsList.get(rowIndex);
+ List<TSDataType> types = typesList.get(rowIndex);
+ for (int colIndex = 0; colIndex < measurements.size(); colIndex++) {
+ if (!measurementType.containsKey(measurements.get(colIndex))) {
+ measurementType.put(measurements.get(colIndex), types.get(colIndex));
+ }
+ }
+ }
+ List<MeasurementSchema> schemaList = new ArrayList<>();
+ List<String> measurementList = new ArrayList<>();
+ // use measurementType to build schemaList
+ for (Entry<String, TSDataType> entry : measurementType.entrySet()) {
+ schemaList.add(new MeasurementSchema(entry.getKey(), entry.getValue()));
+ measurementList.add(entry.getKey());
+ }
+ // build tablet and insert
+ Tablet tablet = new Tablet(deviceId, schemaList, times.size());
+ for (int rowIndex = 0; rowIndex < times.size(); rowIndex++) {
+ addRecordToTablet(
+ tablet,
+ times.get(rowIndex),
+ measurementsList.get(rowIndex),
+ valuesList.get(rowIndex),
+ measurementList);
+ }
+ if (isAligned) {
+ insertAlignedTablet(tablet);
+ } else {
+ insertTablet(tablet);
+ }
+ }
+
+ // convert records of multiple devices to tablets and insert
+ public void convertToTabletsAndInsert(
+ List<String> deviceIds,
+ List<Long> times,
+ List<List<String>> measurementsList,
+ List<List<TSDataType>> typesList,
+ List<List<Object>> valuesList,
+ int deviceSize,
+ boolean isAligned)
+ throws IoTDBConnectionException, StatementExecutionException {
+ // device -> measurement -> type
+ Map<String, Map<String, TSDataType>> measurementTypeMap = new
HashMap<>(deviceSize + 1, 1);
+ // device -> row count
+ Map<String, Integer> rowMap = new HashMap<>(deviceSize + 1, 1);
+ // first we should build measurementTypeMap and rowMap
+ for (int rowIndex = 0; rowIndex < deviceIds.size(); rowIndex++) {
+ String device = deviceIds.get(rowIndex);
+ Map<String, TSDataType> measurementType =
+ measurementTypeMap.computeIfAbsent(device, k -> new HashMap<>());
+ List<String> measurements = measurementsList.get(rowIndex);
+ List<TSDataType> types = typesList.get(rowIndex);
+ for (int colIndex = 0; colIndex < measurements.size(); colIndex++) {
+ if (!measurementType.containsKey(measurements.get(colIndex))) {
+ measurementType.put(measurements.get(colIndex), types.get(colIndex));
+ }
+ }
+ rowMap.merge(device, 1, Integer::sum);
+ }
+ // device -> schema
+ Map<String, List<MeasurementSchema>> schemaMap = new HashMap<>(deviceSize
+ 1, 1);
+ // device -> measurement
+ Map<String, List<String>> measurementMap = new HashMap<>(deviceSize + 1,
1);
+ // use measurementTypeMap to build schemaMap and measurementMap
+ for (Map.Entry<String, Map<String, TSDataType>> entry :
measurementTypeMap.entrySet()) {
+ List<MeasurementSchema> schemaList = new ArrayList<>();
+ List<String> measurementList = new ArrayList<>();
+ for (Map.Entry<String, TSDataType> schemaEntry :
entry.getValue().entrySet()) {
+ schemaList.add(new MeasurementSchema(schemaEntry.getKey(),
schemaEntry.getValue()));
+ measurementList.add(schemaEntry.getKey());
+ }
+ schemaMap.put(entry.getKey(), schemaList);
+ measurementMap.put(entry.getKey(), measurementList);
+ }
+ // device -> tablet
+ Map<String, Tablet> tablets = new HashMap<>(deviceSize + 1, 1);
+ // use schemaMap and rowMap to build tablets and insert
+ for (int rowIndex = 0; rowIndex < deviceIds.size(); rowIndex++) {
+ String device = deviceIds.get(rowIndex);
+ Tablet tablet =
+ tablets.computeIfAbsent(
+ device, k -> new Tablet(device, schemaMap.get(device),
rowMap.get(device)));
+ addRecordToTablet(
+ tablet,
+ times.get(rowIndex),
+ measurementsList.get(rowIndex),
+ valuesList.get(rowIndex),
+ measurementMap.get(device));
+ }
+ if (isAligned) {
+ insertAlignedTablets(tablets);
+ } else {
+ insertTablets(tablets);
+ }
+ }
+
+ // add one record to tablet.
+ public void addRecordToTablet(
+ Tablet tablet,
+ Long timestamp,
+ List<String> measurements,
+ List<Object> values,
+ List<String> allMeasurements) {
+ int row = tablet.rowSize++;
+ tablet.addTimestamp(row, timestamp);
+ // tablet without null value
+ if (measurements.size() == allMeasurements.size()) {
+ for (int i = 0; i < measurements.size(); i++) {
+ tablet.addValue(measurements.get(i), row, values.get(i));
+ }
+ return;
+ }
+ // tablet with null value
+ Map<String, Object> measurementValueMap = new
HashMap<>(measurements.size() + 1, 1);
Review Comment:
optimize
##########
iotdb-client/session/src/main/java/org/apache/iotdb/session/Session.java:
##########
@@ -142,6 +144,7 @@ public class Session implements ISession {
// Cluster version cache
protected boolean enableRedirection;
+ protected boolean enableRecordsConvertTablet;
Review Comment:
enableRecordsAutoConvertTablet
##########
iotdb-client/session/src/main/java/org/apache/iotdb/session/Session.java:
##########
@@ -2737,6 +2706,143 @@ private void updateTSInsertTabletsReq(
request.addToSizeList(tablet.rowSize);
}
+ // convert records of one device to tablet and insert
+ public void convertToTabletAndInsert(
+ String deviceId,
+ List<Long> times,
+ List<List<String>> measurementsList,
+ List<List<TSDataType>> typesList,
+ List<List<Object>> valuesList,
+ boolean isAligned)
+ throws IoTDBConnectionException, StatementExecutionException {
+ // measurement -> type
+ Map<String, TSDataType> measurementType = new HashMap<>();
+ // build measurementType
+ for (int rowIndex = 0; rowIndex < measurementsList.size(); rowIndex++) {
+ List<String> measurements = measurementsList.get(rowIndex);
+ List<TSDataType> types = typesList.get(rowIndex);
+ for (int colIndex = 0; colIndex < measurements.size(); colIndex++) {
+ if (!measurementType.containsKey(measurements.get(colIndex))) {
+ measurementType.put(measurements.get(colIndex), types.get(colIndex));
+ }
+ }
+ }
+ List<MeasurementSchema> schemaList = new ArrayList<>();
+ List<String> measurementList = new ArrayList<>();
+ // use measurementType to build schemaList
+ for (Entry<String, TSDataType> entry : measurementType.entrySet()) {
+ schemaList.add(new MeasurementSchema(entry.getKey(), entry.getValue()));
+ measurementList.add(entry.getKey());
+ }
+ // build tablet and insert
+ Tablet tablet = new Tablet(deviceId, schemaList, times.size());
+ for (int rowIndex = 0; rowIndex < times.size(); rowIndex++) {
+ addRecordToTablet(
+ tablet,
+ times.get(rowIndex),
+ measurementsList.get(rowIndex),
+ valuesList.get(rowIndex),
+ measurementList);
+ }
+ if (isAligned) {
+ insertAlignedTablet(tablet);
+ } else {
+ insertTablet(tablet);
+ }
+ }
+
+ // convert records of multiple devices to tablets and insert
+ public void convertToTabletsAndInsert(
+ List<String> deviceIds,
+ List<Long> times,
+ List<List<String>> measurementsList,
+ List<List<TSDataType>> typesList,
+ List<List<Object>> valuesList,
+ int deviceSize,
+ boolean isAligned)
+ throws IoTDBConnectionException, StatementExecutionException {
+ // device -> measurement -> type
+ Map<String, Map<String, TSDataType>> measurementTypeMap = new
HashMap<>(deviceSize + 1, 1);
+ // device -> row count
+ Map<String, Integer> rowMap = new HashMap<>(deviceSize + 1, 1);
+ // first we should build measurementTypeMap and rowMap
+ for (int rowIndex = 0; rowIndex < deviceIds.size(); rowIndex++) {
+ String device = deviceIds.get(rowIndex);
+ Map<String, TSDataType> measurementType =
+ measurementTypeMap.computeIfAbsent(device, k -> new HashMap<>());
+ List<String> measurements = measurementsList.get(rowIndex);
+ List<TSDataType> types = typesList.get(rowIndex);
+ for (int colIndex = 0; colIndex < measurements.size(); colIndex++) {
+ if (!measurementType.containsKey(measurements.get(colIndex))) {
+ measurementType.put(measurements.get(colIndex), types.get(colIndex));
+ }
+ }
+ rowMap.merge(device, 1, Integer::sum);
+ }
+ // device -> schema
+ Map<String, List<MeasurementSchema>> schemaMap = new HashMap<>(deviceSize
+ 1, 1);
Review Comment:
optimized
##########
iotdb-client/session/src/main/java/org/apache/iotdb/session/Session.java:
##########
@@ -3549,6 +3655,8 @@ public static class Builder {
private int thriftDefaultBufferSize =
SessionConfig.DEFAULT_INITIAL_BUFFER_CAPACITY;
private int thriftMaxFrameSize = SessionConfig.DEFAULT_MAX_FRAME_SIZE;
private boolean enableRedirection = SessionConfig.DEFAULT_REDIRECTION_MODE;
+
Review Comment:
.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]