jt2594838 opened a new pull request, #11580:
URL: https://github.com/apache/iotdb/pull/11580

   ### Introduction
   This PR introduces Local Split, an optimized procedure of syncing 
**historical TsFiles** from one IoTDB instance to another with Pipe. Here is a 
brief introduction to Local Split and comparisons with the old procedure 
(called Basic).
   
   1. The event generation is performed batched with a BatchedTsFileExtractor, 
i.e., it generates PipeBatchTsFileInsertionEvents containing more than one 
TsFile. The number of events is thus reduced, and with multiple files in the 
same event, there are more tuning opportunities during the latter transfer. It 
also reduces network traffic if each TsFile is small. The size of a batch is 
controlled by two factors: the total size of files (in bytes) and the number of 
files.
   If the total size is so large that timeouts may be triggered, a feedback 
mechanism collects the throughput during standard cases. It adjusts the total 
file size according to the product of average throughput and timeout length 
after a timeout. The recorded throughput history is purged statistically to 
distinguish timeout triggered by GC stalls and large files.
   
   2. TsFiles are transferred series-by-series instead of file-by-file. TsFiles 
in a batch are merge-read, thanks to the sorted manner of timeseries. Chunks of 
the same timeseries are collected and sent to the receiver, resulting in only 
one file in the receiver for a batched event. This increases the locality of a 
timeseries just like chunk merge in compaction, but it does not uncompress and 
decode the chunk if unnecessary; thus, it is called Shallow Merge. Moreover, as 
the number of files is reduced in the receiver, so are the higher-level 
metadata and statistics, which may result in faster aggregation.
   
   3. Chunks are further compressed. Although a chunk is compressed by default, 
when using Shallow Merge mentioned above, chunks of the same timeseries are 
sent in a batch, enabling opportunities for further compression. Chunks in a 
batch are compressed by LZ4 by default to reduce network bandwidth consumption.
   
   4. Local Split to avoid chunk-forwarding. In Basic, the receiver may not be 
the actual data holder; it must forward data to the holders. In Local Split, 
the sender queries the partition info from the receiver, performs chunk split 
locally, and sends chunks directly to the data holders. This avoids the 
possible coordinator and unnecessary traffic. 
   Considering the cross-cluster bandwidth could be sacred, the sender does not 
send chunks to all replicas of the receiver. Instead, it only sends to one 
replica and lets the replica forward to others. A throughput-based method is 
applied to select the best relay among the replicas.
   
   5. Grouped and parallel chunk sending. Notice that for non-align timeseries, 
the order of chunks within a device can be arbitrary. This motivates parallel 
processing when sending chunks of the same device. The chunks within a device 
are grouped and sent to the receiver in parallel to maximize the resource 
utility. 
   Chunks are grouped according to the similarity of timeseries, involving the 
measurementId, data type, and samples from the data. Since groups are 
compressed as aforesaid, putting similar timeseries together is helpful to get 
a higher compression ratio.
   
   ### How to Use
   To use the Local Split Feature in a Pipe, the user should add four configs 
when creating a pipe like the following:
   `create pipe a2b with extractor ('extractor.local-split.enable'='true') with 
connector ('connector.local-split.enable'='true', 
'connector.external.config-nodes'='nelbds-15:11710',   
'connector'='iotdb-thrift-connector',   'connector.ip'='nelbds-16',   
'connector.port'='7667') `
   
   'extractor.local-split.enable' and 'connector.local-split.enable'='true' 
must be set to true to enable the feature.
   'connector.external.config-nodes' should be the config node's IP and port of 
the receiver.
   'connector'='iotdb-thrift-connector' is required, as only 
'iotdb-thrift-connector' supports the feature currently.
   
   As implied in the Loca Split,  the sender must have direct access to all 
nodes in the receiver. In other words, the sender should be on the firewall 
white list.
   
   Other configs are:
   "extractor.split.max-concurrent-file", an integer specifying the maximum 
number of files in an event.
   "extractor.split.max-file-batch-size", an integer specifying the maximum 
total size of files in an event, in bytes. The feedback mechanism may reduce 
this after a timeout.
   "connector.split.max-size", an integer specifying the maximum byte size to 
transfer in a request.
   "connector.split.max-concurrent-file", an integer specifying the maximum 
number of files to be Shallow Merged, should be lesser than or equal to 
"extractor.split.max-concurrent-file".
   "connector.external.user-name", the user name to be used on the receiver 
side.
   "connector.external.password", the password to be used on the receiver side.
   
   ### Evaluation
   To perform the evaluation, around 800GB of data is first written to a 1C1D 
instance, then synchronized to a 1C3D instance through a Pipe. The number of 
replicas is the x-axis.
   Below is the total task completion time:
   
![image](https://github.com/apache/iotdb/assets/23610645/683fc052-7293-49d9-a323-6b220c0481a0)
   The results show a 50% reduction in completion time compared with basic.
   
   This is the query latency of aggregating one single timeseries:
   
![image](https://github.com/apache/iotdb/assets/23610645/6aa11080-2893-486a-98bb-84d6f057546c)
   Thanks to Shallow Merge, the increased locality and reduced metadata speed 
up such queries.
   
   
   
   
   
   
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to