VGalaxies opened a new pull request, #11203:
URL: https://github.com/apache/iotdb/pull/11203

   ## Background
   
   Consider the following data synchronization scenario (metadata mismatch) 
using Pipe engine.
   
   1. starting two instances of IoTDB
   
   - A datanode -> 127.0.0.1:6667
   - B datanode -> 127.0.0.1:6668
   
   **NOTE**: IoTDB B should be configured with the following 
`iotdb-common.properties`:
   
   ```properties
   enable_partial_insert=false
   enable_auto_create_schema=false
   ```
   
   2. connecting IoTDB B (6668) by cli and send
   
   ```sql
   create TIMESERIES root.sg.d1.s0 with datatype=float;
   create TIMESERIES root.sg.d1.s1 with datatype=float;
   ```
   
   3. connecting IoTDB A (6667) by cli and send
   
   ```sql
   create pipe test
   with connector (
       'connector'='iotdb-thrift-connector',
       'connector.ip'='127.0.0.1',
       'connector.port'='6668'
   );
   
   start pipe test;
   
   create TIMESERIES root.sg.d1.s0 with datatype=text;
   create TIMESERIES root.sg.d1.s1 with datatype=float;
   
   insert into root.sg.d1(time, s0, s1) values (3, "one", 2.2);
   ```
   
   ---
   
   In IoTDB B, the following errors occur:
   
   ```plain
   2023-09-23 17:18:53,197 [pool-31-IoTDB-ClientRPC-Processor-3] ERROR 
o.a.t.ProcessFunction:47 - Internal error processing pipeTransfer 
   java.lang.NullPointerException: null
           at 
org.apache.iotdb.db.queryengine.plan.statement.crud.PipeEnrichedInsertBaseStatement.isQuery(PipeEnrichedInsertBaseStatement.java:70)
           at 
org.apache.iotdb.db.queryengine.plan.analyze.Analyzer.analyze(Analyzer.java:49)
           at 
org.apache.iotdb.db.queryengine.plan.execution.QueryExecution.analyze(QueryExecution.java:310)
           at 
org.apache.iotdb.db.queryengine.plan.execution.QueryExecution.<init>(QueryExecution.java:170)
           at 
org.apache.iotdb.db.queryengine.plan.Coordinator.createQueryExecution(Coordinator.java:113)
           at 
org.apache.iotdb.db.queryengine.plan.Coordinator.execute(Coordinator.java:147)
           at 
org.apache.iotdb.db.pipe.receiver.thrift.IoTDBThriftReceiverV1.executeStatement(IoTDBThriftReceiverV1.java:498)
           at 
org.apache.iotdb.db.pipe.receiver.thrift.IoTDBThriftReceiverV1.handleTransferTabletBatch(IoTDBThriftReceiverV1.java:241)
           at 
org.apache.iotdb.db.pipe.receiver.thrift.IoTDBThriftReceiverV1.receive(IoTDBThriftReceiverV1.java:111)
           at 
org.apache.iotdb.db.pipe.receiver.thrift.IoTDBThriftReceiverAgent.receive(IoTDBThriftReceiverAgent.java:43)
           at 
org.apache.iotdb.db.protocol.thrift.impl.ClientRPCServiceImpl.pipeTransfer(ClientRPCServiceImpl.java:2549)
           at 
org.apache.iotdb.service.rpc.thrift.IClientRPCService$Processor$pipeTransfer.getResult(IClientRPCService.java:5295)
           at 
org.apache.iotdb.service.rpc.thrift.IClientRPCService$Processor$pipeTransfer.getResult(IClientRPCService.java:5275)
           at org.apache.thrift.ProcessFunction.process(ProcessFunction.java:38)
           at 
org.apache.iotdb.db.protocol.thrift.ProcessorWithMetrics.process(ProcessorWithMetrics.java:64)
           at 
org.apache.thrift.server.TThreadPoolServer$WorkerProcess.run(TThreadPoolServer.java:248)
           at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
           at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
           at java.base/java.lang.Thread.run(Thread.java:829)
   ```
   
   By debugging and tracing back to the source of the error:
   
   - 
`org.apache.iotdb.db.queryengine.plan.analyze.Analyzer#analyze(org.apache.iotdb.db.queryengine.plan.statement.Statement)`
   - `org.apache.iotdb.db.queryengine.plan.statement.StatementVisitor#process`
   - 
`org.apache.iotdb.db.queryengine.plan.analyze.AnalyzeVisitor#visitPipeEnrichedInsert`
   - 
`org.apache.iotdb.db.queryengine.plan.analyze.schema.SchemaValidator#validate(org.apache.iotdb.db.queryengine.plan.analyze.schema.ISchemaFetcher,
 org.apache.iotdb.db.queryengine.plan.statement.crud.InsertBaseStatement, 
org.apache.iotdb.db.queryengine.common.MPPQueryContext)`
   - 
`org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowStatement#updateAfterSchemaValidation`
   - 
`org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowStatement#transferType`
   
   we found that the error occurs in `CommonUtils.parseValue` in 
`transferType`, where it throws a `QueryProcessException`. This exception is 
caught by `validateSchema`, causing the `finishQueryAfterAnalyze` field of the 
`analysis` object to be set to true. This subsequently leads to the `statement` 
field being null, ultimately triggering a `NullPointerException`.
   
   The purpose of this PR is to fix this NPE error, **providing more accurate 
error information for both the sender (IoTDB A) and receiver (IoTDB B)**.
   
   ## Description
   
   Notice that in `iotdb-common.properties`, the default value of 
`enable_partial_insert` is true. In the default configuration, **executing the 
above data synchronization scenario will NOT trigger a NullPointerException 
(NPE) error**. This is because in this situation, the `transferType` method 
will not rethrow a **QueryProcessException** exception.
   
   ```java
   try {
     values[i] = CommonUtils.parseValue(dataTypes[i], values[i].toString());
   } catch (Exception e) {
     LOGGER.warn(
         "data type of {}.{} is not consistent, "
             + "registered type {}, inserting timestamp {}, value {}",
         devicePath,
         measurements[i],
         dataTypes[i],
         time,
         values[i]);
     if (!IoTDBDescriptor.getInstance().getConfig().isEnablePartialInsert()) { 
// <-- NOTE HERE
       throw e;
     } else {
       markFailedMeasurement(i, e);
     }
   }
   ```
   
   The `markFailedMeasurement` method also leads to the analysis becoming 
failed, but it does not set `finishQueryAfterAnalyze` to true (why?) 🤔.
   
   ```java
   if (hasFailedMeasurement) {
     partialInsertMessage =
         String.format(
             "Fail to insert measurements %s caused by %s",
             insertStatement.getFailedMeasurements(), 
insertStatement.getFailedMessages());
     logger.warn(partialInsertMessage);
     analysis.setFailStatus(
         RpcUtils.getStatus(TSStatusCode.METADATA_ERROR.getStatusCode(), 
partialInsertMessage));
   }
   ```
   
   In this situation, the execution state machine's transition to a failed 
state is reasonable:
   
   ```java
   if (context.getQueryType() == QueryType.WRITE && analysis.isFailed()) {
     stateMachine.transitionToFailed(analysis.getFailStatus());
   }
   ```
   
   and warning logs are recorded as below:
   ```java
   2023-09-23 17:42:25,714 [pool-31-IoTDB-ClientRPC-Processor-3] WARN  
o.a.i.d.p.r.t.IoTDBThriftReceiverV1:507 - failed to execute statement, 
statement: 
org.apache.iotdb.db.queryengine.plan.statement.crud.PipeEnrichedInsertBaseStatement@2a3b3d83,
 result status is: TSStatus(code:507, message:Fail to insert measurements [s0] 
caused by [data type of root.sg.d1.s0 is not consistent, registered type FLOAT, 
inserting type TEXT, timestamp 3, value one]) 
   2023-09-23 17:42:25,720 
[pool-31-IoTDB-ClientRPC-Processor-3$20230923_094225_00004_1] WARN  
o.a.i.d.q.p.a.AnalyzeVisitor:2693 - Fail to insert measurements [s0] caused by 
[data type of root.sg.d1.s0 is not consistent, registered type FLOAT, inserting 
type TEXT, timestamp 3, value one] 
   ```
   
   Therefore, when `enable_partial_insert` is set to false, we only need to 
avoid NPE **directly** to achieve the expected purpose.
   
   <!--
   In each section, please describe design decisions made, including:
    - Choice of algorithms
    - Behavioral aspects. What configuration values are acceptable? How are 
corner cases and error 
       conditions handled, such as when there are insufficient resources?
    - Class organization and design (how the logic is split between classes, 
inheritance, composition, 
       design patterns)
    - Method organization and design (how the logic is split between methods, 
parameters and return types)
    - Naming (class, method, API, configuration, HTTP endpoint, names of 
emitted metrics)
   -->
   
   
   <!-- It's good to describe an alternative design (or mention an alternative 
name) for every design 
   (or naming) decision point and compare the alternatives with the designs 
that you've implemented 
   (or the names you've chosen) to highlight the advantages of the chosen 
designs and names. -->
   
   <!-- If there was a discussion of the design of the feature implemented in 
this PR elsewhere 
   (e. g. a "Proposal" issue, any other issue, or a thread in the development 
mailing list), 
   link to that discussion from this PR description and explain what have 
changed in your final design 
   compared to your original proposal or the consensus version in the end of 
the discussion. 
   If something hasn't changed since the original discussion, you can omit a 
detailed discussion of 
   those aspects of the design here, perhaps apart from brief mentioning for 
the sake of readability 
   of this PR description. -->
   
   <!-- Some of the aspects mentioned above may be omitted for simple and small 
changes. -->
   
   <hr>
   
   This PR has:
   - [x] been self-reviewed.
       - [ ] concurrent read
       - [ ] concurrent write
       - [ ] concurrent read and write 
   - [ ] added documentation for new or modified features or behaviors.
   - [ ] added Javadocs for most classes and all non-trivial methods. 
   - [ ] added or updated version, __license__, or notice information
   - [ ] added comments explaining the "why" and the intent of the code 
wherever would not be obvious 
     for an unfamiliar reader.
   - [ ] added unit tests or modified existing tests to cover new code paths, 
ensuring the threshold 
     for code coverage.
   - [ ] added integration tests.
   - [x] been tested in a test IoTDB cluster.
   
   <!-- Check the items by putting "x" in the brackets for the done things. Not 
all of these items 
   apply to every PR. Remove the items which are not done or not relevant to 
the PR. None of the items 
   from the checklist above are strictly necessary, but it would be very 
helpful if you at least 
   self-review the PR. -->
   
   <hr>
   
   ##### Key changed/added classes (or packages if there are too many classes) 
in this PR
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to