THUMarkLau commented on code in PR #12113:
URL: https://github.com/apache/iotdb/pull/12113#discussion_r1527809612
##########
iotdb-client/session/src/main/java/org/apache/iotdb/session/Session.java:
##########
@@ -2737,6 +2755,85 @@ private void updateTSInsertTabletsReq(
request.addToSizeList(tablet.rowSize);
}
+ public void invertToTabletsAndInsert(
+ List<String> deviceIds,
+ List<Long> times,
+ List<List<String>> measurementsList,
+ List<List<TSDataType>> typesList,
+ List<List<Object>> valuesList,
+ boolean isAligned)
+ throws IoTDBConnectionException, StatementExecutionException {
+ // device -> tablet
+ Map<String, Tablet> tablets = new HashMap<>();
+ // device -> measurement
+ Map<String, List<String>> measurementMap = new HashMap<>();
+ // device -> {measurement -> type}
+ Map<String, Map<String, TSDataType>> measurementTypeMap = new HashMap<>();
+ // device -> row count
+ Map<String, Integer> rowMap = new HashMap<>();
+ // device -> schema
+ Map<String, List<MeasurementSchema>> schemaMap = new HashMap<>();
+ for (int rowIndex = 0; rowIndex < deviceIds.size(); rowIndex++) {
+ String device = deviceIds.get(rowIndex);
+ final Map<String, TSDataType> measurementType =
+ measurementTypeMap.computeIfAbsent(device, k -> new HashMap<>());
+ List<String> measurements = measurementsList.get(rowIndex);
+ List<TSDataType> types = typesList.get(rowIndex);
+ for (int colIndex = 0; colIndex < measurements.size(); colIndex++) {
+ int finalColIndex = colIndex;
+ measurementType.computeIfAbsent(measurements.get(colIndex), k ->
types.get(finalColIndex));
+ }
+ rowMap.merge(device, 1, Integer::sum);
+ }
+ for (Map.Entry<String, Map<String, TSDataType>> entry :
measurementTypeMap.entrySet()) {
+ List<MeasurementSchema> schemaList = new ArrayList<>();
+ List<String> measurementList = new ArrayList<>();
+ for (Map.Entry<String, TSDataType> schemaEntry :
entry.getValue().entrySet()) {
+ schemaList.add(new MeasurementSchema(schemaEntry.getKey(),
schemaEntry.getValue()));
+ measurementList.add(schemaEntry.getKey());
+ }
+ schemaMap.put(entry.getKey(), schemaList);
+ measurementMap.put(entry.getKey(), measurementList);
+ }
+
+ for (int rowIndex = 0; rowIndex < deviceIds.size(); rowIndex++) {
+ String device = deviceIds.get(rowIndex);
+ Tablet tablet =
+ tablets.computeIfAbsent(
+ device, k -> new Tablet(device, schemaMap.get(device),
rowMap.get(device)));
+ addRecordToTablet(
+ tablet,
+ times.get(rowIndex),
+ measurementsList.get(rowIndex),
+ valuesList.get(rowIndex),
+ measurementMap.get(device));
+ }
+
+ if (isAligned) {
+ insertAlignedTablets(tablets);
+ } else {
+ insertTablets(tablets);
+ }
+ }
+ // add one record to tablet.
+ public void addRecordToTablet(
+ Tablet tablet,
+ Long timestamp,
+ List<String> measurements,
+ List<Object> values,
+ List<String> allMeasurements) {
+ int row = tablet.rowSize++;
+ tablet.addTimestamp(row, timestamp);
+ Map<String, Object> measurementValueMap = new HashMap<>();
+ for (int i = 0; i < measurements.size(); i++) {
Review Comment:
Do we need to convert the data into a Map before adding it to the Tablet?
Perhaps it can be added directly to the Tablet because the Tablet defaults to
filling in nulls.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]