jt2594838 commented on code in PR #13550:
URL: https://github.com/apache/iotdb/pull/13550#discussion_r1776200063
##########
iotdb-client/session/src/main/java/org/apache/iotdb/session/Session.java:
##########
@@ -2697,6 +2790,111 @@ public void insertRelationalTablet(Tablet tablet)
insertRelationalTablet(tablet, false);
}
+ private void insertRelationalTabletWithLeaderCache(Tablet tablet)
+ throws IoTDBConnectionException, StatementExecutionException {
+ Map<SessionConnection, Tablet> relationalTabletGroup = new HashMap<>();
+ if (tableModelDeviceIdToEndpoint.isEmpty()) {
+ relationalTabletGroup.put(defaultSessionConnection, tablet);
+ } else {
+ for (int i = 0; i < tablet.rowSize; i++) {
+ IDeviceID iDeviceID = tablet.getDeviceID(i);
+ final SessionConnection connection = getSessionConnection(iDeviceID);
+ int finalI = i;
+ relationalTabletGroup.compute(
+ connection,
+ (k, v) -> {
+ if (v == null) {
+ v =
+ new Tablet(
+ tablet.getTableName(),
+ tablet.getSchemas(),
+ tablet.getColumnTypes(),
+ tablet.rowSize);
+ }
+ for (int j = 0; j < v.getSchemas().size(); j++) {
+ v.addValue(
+ v.getSchemas().get(j).getMeasurementId(),
+ v.rowSize,
+ tablet.getValue(finalI, j));
+ }
+ v.addTimestamp(v.rowSize, tablet.timestamps[finalI]);
+ v.rowSize++;
+ return v;
+ });
+ }
+ }
+ insertRelationalTabletByGroup(relationalTabletGroup);
+ }
+
+ @SuppressWarnings({
+ "squid:S3776"
+ }) // ignore Cognitive Complexity of methods should not be too high
+ private <T> void insertRelationalTabletByGroup(
Review Comment:
Where is this type parameter T used?
##########
iotdb-client/isession/src/main/java/org/apache/iotdb/isession/IPooledSession.java:
##########
@@ -28,6 +28,7 @@
import org.apache.thrift.TException;
import org.apache.tsfile.write.record.Tablet;
+/** NOTICE: IPooledSession is specific to the table model. */
public interface IPooledSession extends AutoCloseable {
Review Comment:
I have added an annotation @TableModel, may use that.
##########
iotdb-client/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java:
##########
@@ -703,7 +709,16 @@ private ISession getSession() throws
IoTDBConnectionException {
session = constructNewSession();
try {
- session.open(enableCompression, connectionTimeoutInMs,
deviceIdToEndpoint, availableNodes);
+ if (sqlDialect.equals("tree")) {
+ session.open(
+ enableCompression, connectionTimeoutInMs, deviceIdToEndpoint,
availableNodes);
+ } else {
+ session.open(
+ enableCompression,
+ connectionTimeoutInMs,
+ availableNodes,
+ tableModelDeviceIdToEndpoint);
+ }
Review Comment:
This may not be so robust. If a developer types "treee" by mistake, we
should inform him instead of treating it as "table".
Another related suggestion may be replacing the strings with named constants.
##########
iotdb-client/session/src/main/java/org/apache/iotdb/session/Session.java:
##########
@@ -2697,6 +2790,111 @@ public void insertRelationalTablet(Tablet tablet)
insertRelationalTablet(tablet, false);
}
+ private void insertRelationalTabletWithLeaderCache(Tablet tablet)
+ throws IoTDBConnectionException, StatementExecutionException {
+ Map<SessionConnection, Tablet> relationalTabletGroup = new HashMap<>();
+ if (tableModelDeviceIdToEndpoint.isEmpty()) {
+ relationalTabletGroup.put(defaultSessionConnection, tablet);
+ } else {
+ for (int i = 0; i < tablet.rowSize; i++) {
+ IDeviceID iDeviceID = tablet.getDeviceID(i);
+ final SessionConnection connection = getSessionConnection(iDeviceID);
+ int finalI = i;
+ relationalTabletGroup.compute(
+ connection,
+ (k, v) -> {
+ if (v == null) {
+ v =
+ new Tablet(
+ tablet.getTableName(),
+ tablet.getSchemas(),
+ tablet.getColumnTypes(),
+ tablet.rowSize);
+ }
+ for (int j = 0; j < v.getSchemas().size(); j++) {
+ v.addValue(
+ v.getSchemas().get(j).getMeasurementId(),
+ v.rowSize,
+ tablet.getValue(finalI, j));
+ }
+ v.addTimestamp(v.rowSize, tablet.timestamps[finalI]);
+ v.rowSize++;
+ return v;
+ });
+ }
+ }
+ insertRelationalTabletByGroup(relationalTabletGroup);
+ }
+
+ @SuppressWarnings({
+ "squid:S3776"
+ }) // ignore Cognitive Complexity of methods should not be too high
+ private <T> void insertRelationalTabletByGroup(
+ Map<SessionConnection, Tablet> relationalTabletGroup)
+ throws IoTDBConnectionException, StatementExecutionException {
+ List<CompletableFuture<Void>> completableFutures =
+ relationalTabletGroup.entrySet().stream()
+ .map(
+ entry -> {
+ SessionConnection connection = entry.getKey();
+ Tablet subTablet = entry.getValue();
+ return CompletableFuture.runAsync(
+ () -> {
+ TSInsertTabletReq request =
genTSInsertTabletReq(subTablet, false, false);
+ request.setWriteToTable(true);
+ request.setColumnCategories(
+ subTablet.getColumnTypes().stream()
+ .map(t -> (byte) t.ordinal())
+ .collect(Collectors.toList()));
+ InsertConsumer<TSInsertTabletReq> insertConsumer =
+ SessionConnection::insertTablet;
+ try {
+ insertConsumer.insert(connection, request);
+ } catch (RedirectException e) {
+ List<TEndPoint> endPointList = e.getEndPointList();
+ Map<IDeviceID, TEndPoint> endPointMap = new
HashMap<>();
+ for (int i = 0; i < endPointList.size(); i++) {
+ if (endPointList.get(i) != null) {
+ endPointMap.put(subTablet.getDeviceID(i),
endPointList.get(i));
+ }
+ }
+ endPointMap.forEach(this::handleRedirection);
+ } catch (StatementExecutionException e) {
+ throw new CompletionException(e);
+ } catch (IoTDBConnectionException e) {
+ // remove the broken session
+ removeBrokenSessionConnection(connection);
+ try {
+ insertConsumer.insert(defaultSessionConnection,
request);
+ } catch (IoTDBConnectionException |
StatementExecutionException ex) {
+ throw new CompletionException(ex);
+ } catch (RedirectException ignored) {
+ }
+ }
+ },
+ OPERATION_EXECUTOR);
+ })
+ .collect(Collectors.toList());
+
+ StringBuilder errMsgBuilder = new StringBuilder();
+ for (CompletableFuture<Void> completableFuture : completableFutures) {
+ try {
+ completableFuture.join();
+ } catch (CompletionException completionException) {
+ Throwable cause = completionException.getCause();
+ logger.error("Meet error when async insert!", cause);
+ if (cause instanceof IoTDBConnectionException) {
+ throw (IoTDBConnectionException) cause;
+ } else {
+ errMsgBuilder.append(cause.getMessage());
Review Comment:
Maybe a separator should be added between the messages?
##########
iotdb-client/session/src/test/java/org/apache/iotdb/session/SessionCacheLeaderTest.java:
##########
@@ -931,6 +1025,161 @@ public void testInsertTabletsWithSessionBroken() throws
StatementExecutionExcept
}
}
+ @Test
+ public void testInsertRelationalTabletWithSessionBroken() throws
StatementExecutionException {
+ // without leader cache
+ session = new MockSession("127.0.0.1", 55560, false, "table");
+ try {
+ session.open();
+ } catch (IoTDBConnectionException e) {
+ Assert.fail(e.getMessage());
+ }
+ assertNull(session.tableModelDeviceIdToEndpoint);
+ assertNull(session.endPointToSessionConnection);
+
+ // set the session connection as broken
+ ((MockSession)
session).getLastConstructedSessionConnection().setConnectionBroken(true);
+
+ String tableName = "table1";
+ List<IMeasurementSchema> schemaList = new ArrayList<>();
+ List<ColumnType> columnTypeList = new ArrayList<>();
+ schemaList.add(new MeasurementSchema("id", TSDataType.STRING));
+ schemaList.add(new MeasurementSchema("s1", TSDataType.INT64));
+ schemaList.add(new MeasurementSchema("s2", TSDataType.INT64));
+ columnTypeList.add(ColumnType.ID);
+ columnTypeList.add(ColumnType.MEASUREMENT);
+ columnTypeList.add(ColumnType.MEASUREMENT);
+ Tablet tablet = new Tablet(tableName, schemaList, columnTypeList, 50);
+ long timestamp = System.currentTimeMillis();
+ for (long row = 0; row < 100; row++) {
+ int rowIndex = tablet.rowSize++;
+ tablet.addTimestamp(rowIndex, timestamp);
+ tablet.addValue(schemaList.get(0).getMeasurementId(), rowIndex, "id" +
(rowIndex % 4));
+ for (int s = 1; s < 3; s++) {
+ long value = new Random().nextLong();
+ tablet.addValue(schemaList.get(s).getMeasurementId(), rowIndex, value);
+ }
+
+ if (tablet.rowSize == tablet.getMaxRowNumber()) {
+ try {
+ session.insertRelationalTablet(tablet);
+ } catch (IoTDBConnectionException e) {
+ assertEquals(
+ "the session connection = TEndPoint(ip:127.0.0.1, port:55560) is
broken",
+ e.getMessage());
+ }
+ tablet.reset();
+ }
+ timestamp++;
+ }
+
+ if (tablet.rowSize != 0) {
+ try {
+ session.insertRelationalTablet(tablet);
+ } catch (IoTDBConnectionException e) {
+ assertEquals(
+ "the session connection = TEndPoint(ip:127.0.0.1, port:55560) is
broken",
+ e.getMessage());
+ }
+ tablet.reset();
+ }
+
+ assertNull(session.tableModelDeviceIdToEndpoint);
+ assertNull(session.endPointToSessionConnection);
+ try {
+ session.close();
+ } catch (IoTDBConnectionException e) {
+ Assert.fail(e.getMessage());
+ }
+
+ // with leader cache
+ // rest the session connection
+ session = new MockSession("127.0.0.1", 55560, true, "table");
+ try {
+ session.open();
+ } catch (IoTDBConnectionException e) {
+ Assert.fail(e.getMessage());
+ }
+ assertEquals(0, session.tableModelDeviceIdToEndpoint.size());
+ assertEquals(1, session.endPointToSessionConnection.size());
+
+ for (long row = 0; row < 100; row++) {
+ int rowIndex = tablet.rowSize++;
+ tablet.addTimestamp(rowIndex, timestamp);
+ tablet.addValue(schemaList.get(0).getMeasurementId(), rowIndex, "id" +
(rowIndex % 4));
+ for (int s = 1; s < 3; s++) {
+ long value = new Random().nextLong();
+ tablet.addValue(schemaList.get(s).getMeasurementId(), rowIndex, value);
+ }
+
+ if (tablet.rowSize == tablet.getMaxRowNumber()) {
+ try {
+ session.insertRelationalTablet(tablet);
+ } catch (IoTDBConnectionException e) {
+ assertEquals(
+ "the session connection = TEndPoint(ip:127.0.0.1, port:55560) is
broken",
+ e.getMessage());
+ }
+ tablet.reset();
+ }
+ timestamp++;
+ }
+
+ // set the session connection as broken
+ ((MockSession)
session).getLastConstructedSessionConnection().setConnectionBroken(true);
+ // set connection as broken, due to we enable the cache leader, when we
called
+ // ((MockSession) session).getLastConstructedSessionConnection(), the
session's endpoint has
+ // been changed to EndPoint(ip:127.0.0.1, port:55562)
+ Assert.assertEquals(
+ "MockSessionConnection{ endPoint=TEndPoint(ip:127.0.0.1, port:55562)}",
+ ((MockSession)
session).getLastConstructedSessionConnection().toString());
+
+ for (long row = 0; row < 100; row++) {
+ int rowIndex = tablet.rowSize++;
+ tablet.addTimestamp(rowIndex, timestamp);
+ tablet.addValue(schemaList.get(0).getMeasurementId(), rowIndex, "id" +
(rowIndex % 4));
+ for (int s = 1; s < 3; s++) {
+ long value = new Random().nextLong();
+ tablet.addValue(schemaList.get(s).getMeasurementId(), rowIndex, value);
+ }
+
+ if (tablet.rowSize == tablet.getMaxRowNumber()) {
+ try {
+ session.insertRelationalTablet(tablet);
+ } catch (IoTDBConnectionException e) {
+ assertEquals(
+ "the session connection = TEndPoint(ip:127.0.0.1, port:55560) is
broken",
+ e.getMessage());
+ }
Review Comment:
What if the exception is not thrown?
##########
iotdb-client/session/src/main/java/org/apache/iotdb/session/Session.java:
##########
@@ -1384,6 +1440,34 @@ private void handleRedirection(String deviceId,
TEndPoint endpoint) {
}
}
+ private void handleRedirection(IDeviceID deviceId, TEndPoint endpoint) {
+ if (enableRedirection) {
+ // no need to redirection
+ if (endpoint.ip.equals("0.0.0.0")) {
+ return;
+ }
+ AtomicReference<IoTDBConnectionException> exceptionReference = new
AtomicReference<>();
+ if (!tableModelDeviceIdToEndpoint.containsKey(deviceId)
+ || !tableModelDeviceIdToEndpoint.get(deviceId).equals(endpoint)) {
+ tableModelDeviceIdToEndpoint.put(deviceId, endpoint);
+ }
+ SessionConnection connection =
+ endPointToSessionConnection.computeIfAbsent(
+ endpoint,
+ k -> {
+ try {
+ return constructSessionConnection(this, endpoint, zoneId);
+ } catch (IoTDBConnectionException ex) {
+ exceptionReference.set(ex);
+ return null;
+ }
+ });
+ if (connection == null) {
+ tableModelDeviceIdToEndpoint.remove(deviceId);
+ }
+ }
Review Comment:
Where is the exceptionReference used?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]