SteveYurongSu commented on code in PR #14341:
URL: https://github.com/apache/iotdb/pull/14341#discussion_r1884971779
##########
integration-test/src/test/java/org/apache/iotdb/pipe/it/tablemodel/IoTDBPipeWithLoadIT.java:
##########
@@ -124,4 +130,164 @@ public void testReceiverNotLoadDeletedTimeseries() throws
Exception {
TableModelUtils.assertCountData("test", "test", 50, receiverEnv,
handleFailure);
}
}
+
+ // Test that receiver will not load data when table exists but ID columns
mismatch
+ @Test
+ public void testReceiverNotLoadWhenIdColumnMismatch() throws Exception {
+ final DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);
+ final String receiverIp = receiverDataNode.getIp();
+ final int receiverPort = receiverDataNode.getPort();
+ final Consumer<String> handleFailure =
+ o -> {
+ TestUtils.executeNonQueryWithRetry(senderEnv, "flush");
+ TestUtils.executeNonQueryWithRetry(receiverEnv, "flush");
+ };
+
+ final Map<String, String> extractorAttributes = new HashMap<>();
+ final Map<String, String> processorAttributes = new HashMap<>();
+ final Map<String, String> connectorAttributes = new HashMap<>();
+
+ extractorAttributes.put("capture.table", "true");
+ extractorAttributes.put("extractor.realtime.mode", "file");
+
+ connectorAttributes.put("connector.batch.enable", "false");
+ connectorAttributes.put("connector.ip", receiverIp);
+ connectorAttributes.put("connector.port", Integer.toString(receiverPort));
+
+ try (final SyncConfigNodeIServiceClient client =
+ (SyncConfigNodeIServiceClient)
senderEnv.getLeaderConfigNodeConnection()) {
+ try (Connection connection =
senderEnv.getConnection(BaseEnv.TABLE_SQL_DIALECT);
+ Statement statement = connection.createStatement()) {
+ statement.execute("create database if not exists db");
+ statement.execute("use db");
+ statement.execute(
+ "create table if not exists t1(id1 STRING ID, id2 STRING ID, s1
TEXT MEASUREMENT, s2 INT32 MEASUREMENT)");
+ statement.execute("INSERT INTO t1(time,id1,id2,s1,s2) values(1, 'd1',
'd2', 'red', 1)");
+ statement.execute("INSERT INTO t1(time,id1,id2,s1,s2) values(2, 'd1',
'd2', 'blue', 2)");
+ statement.execute("flush");
+ } catch (Exception e) {
+ fail(e.getMessage());
+ }
+
+ try (Connection connection =
receiverEnv.getConnection(BaseEnv.TABLE_SQL_DIALECT);
+ Statement statement = connection.createStatement()) {
+ statement.execute("create database if not exists db");
+ statement.execute("use db");
+ statement.execute(
+ "create table if not exists t1(id3 STRING ID, id4 STRING ID, s3
TEXT MEASUREMENT, s4 INT32 MEASUREMENT)");
+ statement.execute("INSERT INTO t1(time,id3,id4,s3,s4) values(1, 'd3',
'd4', 'red2', 10)");
+ statement.execute("INSERT INTO t1(time,id3,id4,s3,s4) values(2, 'd3',
'd4', 'blue2', 20)");
+ statement.execute("flush");
+ } catch (Exception e) {
+ fail(e.getMessage());
+ }
+
+ TSStatus status =
+ client.createPipe(
+ new TCreatePipeReq("p1", connectorAttributes)
+ .setExtractorAttributes(extractorAttributes)
+ .setProcessorAttributes(processorAttributes));
+ Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(),
status.getCode());
+ Assert.assertEquals(
+ TSStatusCode.SUCCESS_STATUS.getStatusCode(),
client.startPipe("p1").getCode());
+
+ try {
+ // wait some time
+ Thread.sleep(10_000);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+
+ Set<String> expectedResSet = new java.util.HashSet<>();
+ expectedResSet.add("1970-01-01T00:00:00.002Z,d3,d4,blue2,20,");
+ expectedResSet.add("1970-01-01T00:00:00.001Z,d3,d4,red2,10,");
+ // make sure data are not transferred
+ TestUtils.assertDataEventuallyOnEnv(
+ receiverEnv,
+ "select * from t1",
+ "time,id3,id4,s3,s4,",
+ expectedResSet,
+ "db",
+ handleFailure);
+ }
+ }
+
+ // Test that receiver can load data when table exists and existing ID
columns are the prefix of
+ // incoming ID columns
+ @Test
+ public void testReceiverAutoExtendIdColumn() throws Exception {
+ final DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);
+ final String receiverIp = receiverDataNode.getIp();
+ final int receiverPort = receiverDataNode.getPort();
+ final Consumer<String> handleFailure =
+ o -> {
+ TestUtils.executeNonQueryWithRetry(senderEnv, "flush");
+ TestUtils.executeNonQueryWithRetry(receiverEnv, "flush");
+ };
+
+ final Map<String, String> extractorAttributes = new HashMap<>();
+ final Map<String, String> processorAttributes = new HashMap<>();
+ final Map<String, String> connectorAttributes = new HashMap<>();
+
+ extractorAttributes.put("capture.table", "true");
+ extractorAttributes.put("extractor.realtime.mode", "file");
+
+ connectorAttributes.put("connector.batch.enable", "false");
+ connectorAttributes.put("connector.ip", receiverIp);
+ connectorAttributes.put("connector.port", Integer.toString(receiverPort));
+
+ try (final SyncConfigNodeIServiceClient client =
+ (SyncConfigNodeIServiceClient)
senderEnv.getLeaderConfigNodeConnection()) {
+ try (Connection connection =
senderEnv.getConnection(BaseEnv.TABLE_SQL_DIALECT);
+ Statement statement = connection.createStatement()) {
+ statement.execute("create database if not exists db");
+ statement.execute("use db");
+ statement.execute(
+ "create table if not exists t1(id1 STRING ID, id2 STRING ID, id3
STRING ID, s1 TEXT MEASUREMENT, s2 INT32 MEASUREMENT)");
+ statement.execute(
+ "INSERT INTO t1(time,id1,id2,id3,s1,s2) values(1, 'd1', 'd2',
'd3', 'red', 1)");
+ statement.execute(
+ "INSERT INTO t1(time,id1,id2,id3,s1,s2) values(2, 'd1', 'd2',
'd3', 'blue', 2)");
+ statement.execute("flush");
+ } catch (Exception e) {
+ fail(e.getMessage());
+ }
+
+ try (Connection connection =
receiverEnv.getConnection(BaseEnv.TABLE_SQL_DIALECT);
+ Statement statement = connection.createStatement()) {
+ statement.execute("create database if not exists db");
+ statement.execute("use db");
+ statement.execute(
+ "create table if not exists t1(id1 STRING ID, id2 STRING ID, s3
TEXT MEASUREMENT, s4 INT32 MEASUREMENT)");
+ statement.execute("INSERT INTO t1(time,id1,id2,s3,s4) values(1, 'd1',
'd2', 'red2', 10)");
+ statement.execute("INSERT INTO t1(time,id1,id2,s3,s4) values(2, 'd1',
'd2', 'blue2', 20)");
+ statement.execute("flush");
+ } catch (Exception e) {
+ fail(e.getMessage());
+ }
+
+ TSStatus status =
+ client.createPipe(
+ new TCreatePipeReq("p1", connectorAttributes)
+ .setExtractorAttributes(extractorAttributes)
+ .setProcessorAttributes(processorAttributes));
+ Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(),
status.getCode());
+ Assert.assertEquals(
+ TSStatusCode.SUCCESS_STATUS.getStatusCode(),
client.startPipe("p1").getCode());
+
+ Set<String> expectedResSet = new java.util.HashSet<>();
+ expectedResSet.add("1970-01-01T00:00:00.001Z,d1,d2,null,null,d3,red,1,");
+
expectedResSet.add("1970-01-01T00:00:00.002Z,d1,d2,null,null,d3,blue,2,");
+
expectedResSet.add("1970-01-01T00:00:00.001Z,d1,d2,red2,10,null,null,null,");
+
expectedResSet.add("1970-01-01T00:00:00.002Z,d1,d2,blue2,20,null,null,null,");
+ // make sure data are transferred and column "id3" is auto extended
+ TestUtils.assertDataEventuallyOnEnv(
+ receiverEnv,
+ "select * from t1",
+ "time,id1,id2,s3,s4,id3,s1,s2,",
+ expectedResSet,
+ "db",
+ handleFailure);
+ }
+ }
Review Comment:
More tests.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]