Caideyipi commented on code in PR #12018:
URL: https://github.com/apache/iotdb/pull/12018#discussion_r1475727953
##########
integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeAlterIT.java:
##########
@@ -138,17 +185,60 @@ public void testPipeAlterFailure() {
} catch (SQLException e) {
fail(e.getMessage());
}
+ }
- // useless alter
- sql =
+ @Test
+ public void testAlterPipeProcessor() {
+ DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);
+
+ // create pipe
+ String sql =
String.format(
- "alter pipe a2b modify sink ('node-urls'='%s',
'batch.enable'='false')",
+ "create pipe a2b with processor
('processor'='down-sampling-processor', 'down-sampling.interval-seconds'='1',
'down-sampling.split-file'='true') with sink ('node-urls'='%s',
'batch.enable'='false')",
receiverDataNode.getIpAndPortString());
try (Connection connection = senderEnv.getConnection();
Statement statement = connection.createStatement()) {
statement.execute(sql);
+ } catch (SQLException e) {
+ fail(e.getMessage());
+ }
+
+ // insert history data on sender
+ if (!TestUtils.tryExecuteNonQueriesWithRetry(
+ senderEnv,
+ Arrays.asList(
+ "insert into root.db.d1 (time, at1) values (1000, 1), (1500, 2),
(2000, 3), (2500, 4), (3000, 5)",
+ "flush"))) {
fail();
- } catch (SQLException ignore) {
}
+
+ // check data on receiver
+ Set<String> expectedResSet = new HashSet<>();
+ expectedResSet.add("1000,1.0,");
+ expectedResSet.add("2000,3.0,");
+ expectedResSet.add("3000,5.0,");
+ TestUtils.assertDataOnEnv(
+ receiverEnv, "select * from root.**", "Time,root.db.d1.at1,",
expectedResSet);
+
+ // clear data on receiver
+ if (!TestUtils.tryExecuteNonQueriesWithRetry(
+ receiverEnv, Collections.singletonList("delete from root.**"))) {
+ fail();
+ }
+
+ // alter pipe (modify 'down-sampling.interval-seconds')
+ try (Connection connection = senderEnv.getConnection();
+ Statement statement = connection.createStatement()) {
+ statement.execute("alter pipe a2b modify processor
('down-sampling.interval-seconds'='2')");
+ } catch (SQLException e) {
+ fail(e.getMessage());
+ }
+
+ // check data on receiver
+ expectedResSet.clear();
+ expectedResSet.add("1000,1.0,");
+ expectedResSet.add("3000,5.0,");
+ TestUtils.assertDataOnEnv(
Review Comment:
BTW, though the progressIndex is preserved on CN, the DN inherit logic still
needs to be preserved to retransmit less data... I did not notice that in the
last PR...
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]