Caideyipi commented on code in PR #14168:
URL: https://github.com/apache/iotdb/pull/14168#discussion_r1893617782
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/sdt/SwingingDoorTrendingFilter.java:
##########
@@ -144,11 +143,17 @@ private boolean tryFilter(final long timestamp, final T
value) {
return false;
}
- private void reset(final long timestamp, final T value) {
+ public void reset(final long arrivalTime, final long timestamp, final Object
value) {
upperDoor = Double.MIN_VALUE;
lowerDoor = Double.MAX_VALUE;
- lastStoredTimestamp = timestamp;
+ lastPointEventTime = timestamp;
lastStoredValue = value;
+
+ lastPointArrivalTime = arrivalTime;
+ }
+
+ public long estimatedMemory() {
+ return estimatedMemory + 64;
Review Comment:
Better only compute it once in the static variable?
##########
integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeProcessorIT.java:
##########
@@ -144,4 +144,167 @@ public void testTumblingTimeSamplingProcessor() throws
Exception {
receiverEnv, "select * from root.**", "Time,root.vehicle.d0.s1,",
expectedResSet);
}
}
+
+ @Test
+ public void testChangingValueProcessor() throws Exception {
+ final DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);
+
+ final String receiverIp = receiverDataNode.getIp();
+ final int receiverPort = receiverDataNode.getPort();
+
+ try (final SyncConfigNodeIServiceClient client =
+ (SyncConfigNodeIServiceClient)
senderEnv.getLeaderConfigNodeConnection()) {
+ // Test empty tsFile parsing
+ // Assert that an empty tsFile will not be parsed by the processor then
block
+ // the subsequent data processing
+ // Do not fail if the failure has nothing to do with pipe
+ // Because the failures will randomly generate due to resource limitation
+ if (!TestUtils.tryExecuteNonQueriesWithRetry(
+ senderEnv,
+ Arrays.asList(
+ "insert into root.vehicle.d0(time, s1) values (0, 1)", "delete
from root.**"))) {
+ return;
+ }
+
+ final Map<String, String> extractorAttributes = new HashMap<>();
+ final Map<String, String> processorAttributes = new HashMap<>();
+ final Map<String, String> connectorAttributes = new HashMap<>();
+
+ extractorAttributes.put("source.realtime.mode", "log");
+
+ processorAttributes.put("processor",
"changing-value-sampling-processor");
+
processorAttributes.put("processor.changing-value.compression-deviation", "10");
+ processorAttributes.put("processor.changing-value.min-time-interval",
"10000");
+ processorAttributes.put("processor.changing-value.max-time-interval",
"20000");
+
+ connectorAttributes.put("sink", "iotdb-thrift-sink");
+ connectorAttributes.put("sink.batch.enable", "false");
+ connectorAttributes.put("sink.ip", receiverIp);
+ connectorAttributes.put("sink.port", Integer.toString(receiverPort));
+
+ final TSStatus status =
+ client.createPipe(
+ new TCreatePipeReq("testPipe", connectorAttributes)
+ .setExtractorAttributes(extractorAttributes)
+ .setProcessorAttributes(processorAttributes));
+
+ Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(),
status.getCode());
+
+ Assert.assertEquals(
+ TSStatusCode.SUCCESS_STATUS.getStatusCode(),
client.startPipe("testPipe").getCode());
+
+ if (!TestUtils.tryExecuteNonQueriesWithRetry(
+ senderEnv,
+ Arrays.asList(
+ "insert into root.vehicle.d0(time, s1) values (0, 10)",
+ "insert into root.vehicle.d0(time, s1) values (9999, 20)",
+ "insert into root.vehicle.d0(time, s1) values (10000, 30)",
+ "insert into root.vehicle.d0(time, s1) values (19000, 40)",
+ "insert into root.vehicle.d0(time, s1) values (20000, 50)",
+ "insert into root.vehicle.d0(time, s1) values (29001, 60)",
+ "insert into root.vehicle.d0(time, s1) values (50000, 70)",
+ "insert into root.vehicle.d0(time, s1) values (60000, 71)",
+ "flush"))) {
+ return;
+ }
+
+ final Set<String> expectedResSet = new HashSet<>();
+
+ expectedResSet.add("0,10.0,");
+ expectedResSet.add("10000,30.0,");
+ expectedResSet.add("20000,50.0,");
+ expectedResSet.add("50000,70.0,");
+
+ TestUtils.assertDataEventuallyOnEnv(
+ receiverEnv, "select * from root.**", "Time,root.vehicle.d0.s1,",
expectedResSet);
+ }
+ }
+
+ @Test
+ public void testChangingPointProcessor() throws Exception {
Review Comment:
Better add test to "SDT"?
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/changing/ChangingPointFilter.java:
##########
@@ -79,23 +76,28 @@ private boolean tryFilter(final long timestamp, final T
value) {
return false;
}
- reset(timestamp, value);
+ reset(arrivalTime, timestamp, value);
return true;
}
// For other numerical types, we compare the value difference
if (Math.abs(
Double.parseDouble(lastStoredValue.toString()) -
Double.parseDouble(value.toString()))
- > processor.getCompressionDeviation()) {
- reset(timestamp, value);
+ > compressionDeviation) {
+ reset(arrivalTime, timestamp, value);
return true;
}
return false;
}
- private void reset(final long timestamp, final T value) {
- lastStoredTimestamp = timestamp;
+ public void reset(final long arrivalTime, final long timestamp, final Object
value) {
+ lastPointArrivalTime = arrivalTime;
+ lastPointEventTime = timestamp;
lastStoredValue = value;
}
+
+ public long estimatedMemory() {
+ return estimatedMemory + 64;
Review Comment:
Better only compute it once in the static variable?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]