Caideyipi commented on code in PR #12018:
URL: https://github.com/apache/iotdb/pull/12018#discussion_r1475709855
##########
integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeAlterIT.java:
##########
@@ -138,17 +185,60 @@ public void testPipeAlterFailure() {
} catch (SQLException e) {
fail(e.getMessage());
}
+ }
- // useless alter
- sql =
+ @Test
+ public void testAlterPipeProcessor() {
+ DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);
+
+ // create pipe
+ String sql =
String.format(
- "alter pipe a2b modify sink ('node-urls'='%s',
'batch.enable'='false')",
+ "create pipe a2b with processor
('processor'='down-sampling-processor', 'down-sampling.interval-seconds'='1',
'down-sampling.split-file'='true') with sink ('node-urls'='%s',
'batch.enable'='false')",
receiverDataNode.getIpAndPortString());
try (Connection connection = senderEnv.getConnection();
Statement statement = connection.createStatement()) {
statement.execute(sql);
+ } catch (SQLException e) {
+ fail(e.getMessage());
+ }
+
+ // insert history data on sender
+ if (!TestUtils.tryExecuteNonQueriesWithRetry(
+ senderEnv,
+ Arrays.asList(
+ "insert into root.db.d1 (time, at1) values (1000, 1), (1500, 2),
(2000, 3), (2500, 4), (3000, 5)",
+ "flush"))) {
fail();
- } catch (SQLException ignore) {
}
+
+ // check data on receiver
+ Set<String> expectedResSet = new HashSet<>();
+ expectedResSet.add("1000,1.0,");
+ expectedResSet.add("2000,3.0,");
+ expectedResSet.add("3000,5.0,");
+ TestUtils.assertDataOnEnv(
+ receiverEnv, "select * from root.**", "Time,root.db.d1.at1,",
expectedResSet);
+
+ // clear data on receiver
+ if (!TestUtils.tryExecuteNonQueriesWithRetry(
+ receiverEnv, Collections.singletonList("delete from root.**"))) {
+ fail();
+ }
+
+ // alter pipe (modify 'down-sampling.interval-seconds')
+ try (Connection connection = senderEnv.getConnection();
+ Statement statement = connection.createStatement()) {
+ statement.execute("alter pipe a2b modify processor
('down-sampling.interval-seconds'='2')");
+ } catch (SQLException e) {
+ fail(e.getMessage());
+ }
+
+ // check data on receiver
+ expectedResSet.clear();
+ expectedResSet.add("1000,1.0,");
+ expectedResSet.add("3000,5.0,");
+ TestUtils.assertDataOnEnv(
Review Comment:
The previous progressIndex is reported to local PipeTaskMeta on
PipeTaskDataNodeAgent, therefore the "alter" logic can see it. Although they
might be sent again when flushed to a single tsFile, assuring "retransmission"
may
still not be good.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]