VGalaxies opened a new pull request, #11203:
URL: https://github.com/apache/iotdb/pull/11203
## Background
Consider the following data synchronization scenario (metadata mismatch)
using Pipe engine.
1. starting two instances of IoTDB
- A datanode -> 127.0.0.1:6667
- B datanode -> 127.0.0.1:6668
**NOTE**: IoTDB B should be configured with the following
`iotdb-common.properties`:
```properties
enable_partial_insert=false
enable_auto_create_schema=false
```
2. connecting IoTDB B (6668) by cli and send
```sql
create TIMESERIES root.sg.d1.s0 with datatype=float;
create TIMESERIES root.sg.d1.s1 with datatype=float;
```
3. connecting IoTDB A (6667) by cli and send
```sql
create pipe test
with connector (
'connector'='iotdb-thrift-connector',
'connector.ip'='127.0.0.1',
'connector.port'='6668'
);
start pipe test;
create TIMESERIES root.sg.d1.s0 with datatype=text;
create TIMESERIES root.sg.d1.s1 with datatype=float;
insert into root.sg.d1(time, s0, s1) values (3, "one", 2.2);
```
---
In IoTDB B, the following errors occur:
```plain
2023-09-23 17:18:53,197 [pool-31-IoTDB-ClientRPC-Processor-3] ERROR
o.a.t.ProcessFunction:47 - Internal error processing pipeTransfer
java.lang.NullPointerException: null
at
org.apache.iotdb.db.queryengine.plan.statement.crud.PipeEnrichedInsertBaseStatement.isQuery(PipeEnrichedInsertBaseStatement.java:70)
at
org.apache.iotdb.db.queryengine.plan.analyze.Analyzer.analyze(Analyzer.java:49)
at
org.apache.iotdb.db.queryengine.plan.execution.QueryExecution.analyze(QueryExecution.java:310)
at
org.apache.iotdb.db.queryengine.plan.execution.QueryExecution.<init>(QueryExecution.java:170)
at
org.apache.iotdb.db.queryengine.plan.Coordinator.createQueryExecution(Coordinator.java:113)
at
org.apache.iotdb.db.queryengine.plan.Coordinator.execute(Coordinator.java:147)
at
org.apache.iotdb.db.pipe.receiver.thrift.IoTDBThriftReceiverV1.executeStatement(IoTDBThriftReceiverV1.java:498)
at
org.apache.iotdb.db.pipe.receiver.thrift.IoTDBThriftReceiverV1.handleTransferTabletBatch(IoTDBThriftReceiverV1.java:241)
at
org.apache.iotdb.db.pipe.receiver.thrift.IoTDBThriftReceiverV1.receive(IoTDBThriftReceiverV1.java:111)
at
org.apache.iotdb.db.pipe.receiver.thrift.IoTDBThriftReceiverAgent.receive(IoTDBThriftReceiverAgent.java:43)
at
org.apache.iotdb.db.protocol.thrift.impl.ClientRPCServiceImpl.pipeTransfer(ClientRPCServiceImpl.java:2549)
at
org.apache.iotdb.service.rpc.thrift.IClientRPCService$Processor$pipeTransfer.getResult(IClientRPCService.java:5295)
at
org.apache.iotdb.service.rpc.thrift.IClientRPCService$Processor$pipeTransfer.getResult(IClientRPCService.java:5275)
at org.apache.thrift.ProcessFunction.process(ProcessFunction.java:38)
at
org.apache.iotdb.db.protocol.thrift.ProcessorWithMetrics.process(ProcessorWithMetrics.java:64)
at
org.apache.thrift.server.TThreadPoolServer$WorkerProcess.run(TThreadPoolServer.java:248)
at
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
```
By debugging and tracing back to the source of the error:
-
`org.apache.iotdb.db.queryengine.plan.analyze.Analyzer#analyze(org.apache.iotdb.db.queryengine.plan.statement.Statement)`
- `org.apache.iotdb.db.queryengine.plan.statement.StatementVisitor#process`
-
`org.apache.iotdb.db.queryengine.plan.analyze.AnalyzeVisitor#visitPipeEnrichedInsert`
-
`org.apache.iotdb.db.queryengine.plan.analyze.schema.SchemaValidator#validate(org.apache.iotdb.db.queryengine.plan.analyze.schema.ISchemaFetcher,
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertBaseStatement,
org.apache.iotdb.db.queryengine.common.MPPQueryContext)`
-
`org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowStatement#updateAfterSchemaValidation`
-
`org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowStatement#transferType`
we found that the error occurs in `CommonUtils.parseValue` in
`transferType`, where it throws a `QueryProcessException`. This exception is
caught by `validateSchema`, causing the `finishQueryAfterAnalyze` field of the
`analysis` object to be set to true. This subsequently leads to the `statement`
field being null, ultimately triggering a `NullPointerException`.
The purpose of this PR is to fix this NPE error, **providing more accurate
error information for both the sender (IoTDB A) and receiver (IoTDB B)**.
## Description
Notice that in `iotdb-common.properties`, the default value of
`enable_partial_insert` is true. In the default configuration, **executing the
above data synchronization scenario will NOT trigger a NullPointerException
(NPE) error**. This is because in this situation, the `transferType` method
will not rethrow a **QueryProcessException** exception.
```java
try {
values[i] = CommonUtils.parseValue(dataTypes[i], values[i].toString());
} catch (Exception e) {
LOGGER.warn(
"data type of {}.{} is not consistent, "
+ "registered type {}, inserting timestamp {}, value {}",
devicePath,
measurements[i],
dataTypes[i],
time,
values[i]);
if (!IoTDBDescriptor.getInstance().getConfig().isEnablePartialInsert()) {
// <-- NOTE HERE
throw e;
} else {
markFailedMeasurement(i, e);
}
}
```
The `markFailedMeasurement` method also leads to the analysis becoming
failed, but it does not set `finishQueryAfterAnalyze` to true (why?) 🤔.
```java
if (hasFailedMeasurement) {
partialInsertMessage =
String.format(
"Fail to insert measurements %s caused by %s",
insertStatement.getFailedMeasurements(),
insertStatement.getFailedMessages());
logger.warn(partialInsertMessage);
analysis.setFailStatus(
RpcUtils.getStatus(TSStatusCode.METADATA_ERROR.getStatusCode(),
partialInsertMessage));
}
```
In this situation, the execution state machine's transition to a failed
state is reasonable:
```java
if (context.getQueryType() == QueryType.WRITE && analysis.isFailed()) {
stateMachine.transitionToFailed(analysis.getFailStatus());
}
```
and warning logs are recorded as below:
```java
2023-09-23 17:42:25,714 [pool-31-IoTDB-ClientRPC-Processor-3] WARN
o.a.i.d.p.r.t.IoTDBThriftReceiverV1:507 - failed to execute statement,
statement:
org.apache.iotdb.db.queryengine.plan.statement.crud.PipeEnrichedInsertBaseStatement@2a3b3d83,
result status is: TSStatus(code:507, message:Fail to insert measurements [s0]
caused by [data type of root.sg.d1.s0 is not consistent, registered type FLOAT,
inserting type TEXT, timestamp 3, value one])
2023-09-23 17:42:25,720
[pool-31-IoTDB-ClientRPC-Processor-3$20230923_094225_00004_1] WARN
o.a.i.d.q.p.a.AnalyzeVisitor:2693 - Fail to insert measurements [s0] caused by
[data type of root.sg.d1.s0 is not consistent, registered type FLOAT, inserting
type TEXT, timestamp 3, value one]
```
Therefore, when `enable_partial_insert` is set to false, we only need to
avoid NPE **directly** to achieve the expected purpose.
<!--
In each section, please describe design decisions made, including:
- Choice of algorithms
- Behavioral aspects. What configuration values are acceptable? How are
corner cases and error
conditions handled, such as when there are insufficient resources?
- Class organization and design (how the logic is split between classes,
inheritance, composition,
design patterns)
- Method organization and design (how the logic is split between methods,
parameters and return types)
- Naming (class, method, API, configuration, HTTP endpoint, names of
emitted metrics)
-->
<!-- It's good to describe an alternative design (or mention an alternative
name) for every design
(or naming) decision point and compare the alternatives with the designs
that you've implemented
(or the names you've chosen) to highlight the advantages of the chosen
designs and names. -->
<!-- If there was a discussion of the design of the feature implemented in
this PR elsewhere
(e. g. a "Proposal" issue, any other issue, or a thread in the development
mailing list),
link to that discussion from this PR description and explain what have
changed in your final design
compared to your original proposal or the consensus version in the end of
the discussion.
If something hasn't changed since the original discussion, you can omit a
detailed discussion of
those aspects of the design here, perhaps apart from brief mentioning for
the sake of readability
of this PR description. -->
<!-- Some of the aspects mentioned above may be omitted for simple and small
changes. -->
<hr>
This PR has:
- [x] been self-reviewed.
- [ ] concurrent read
- [ ] concurrent write
- [ ] concurrent read and write
- [ ] added documentation for new or modified features or behaviors.
- [ ] added Javadocs for most classes and all non-trivial methods.
- [ ] added or updated version, __license__, or notice information
- [ ] added comments explaining the "why" and the intent of the code
wherever would not be obvious
for an unfamiliar reader.
- [ ] added unit tests or modified existing tests to cover new code paths,
ensuring the threshold
for code coverage.
- [ ] added integration tests.
- [x] been tested in a test IoTDB cluster.
<!-- Check the items by putting "x" in the brackets for the done things. Not
all of these items
apply to every PR. Remove the items which are not done or not relevant to
the PR. None of the items
from the checklist above are strictly necessary, but it would be very
helpful if you at least
self-review the PR. -->
<hr>
##### Key changed/added classes (or packages if there are too many classes)
in this PR
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]