JackieTien97 commented on code in PR #16829:
URL: https://github.com/apache/iotdb/pull/16829#discussion_r2575831750
##########
iotdb-core/confignode/src/test/resources/confignode1conf/iotdb-system.properties:
##########
@@ -34,7 +34,7 @@ timestamp_precision=ms
data_region_consensus_protocol_class=org.apache.iotdb.consensus.iot.IoTConsensus
schema_region_consensus_protocol_class=org.apache.iotdb.consensus.ratis.RatisConsensus
schema_replication_factor=3
-data_replication_factor=3
+data_replication_factor=1
Review Comment:
change it back
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ObjectTypeUtils.java:
##########
@@ -40,6 +58,117 @@ public class ObjectTypeUtils {
private ObjectTypeUtils() {}
+ public static ByteBuffer readObjectContent(
+ Binary binary, long offset, int length, boolean mayNotInCurrentNode) {
+ Pair<Long, String> objectLengthPathPair =
ObjectTypeUtils.parseObjectBinary(binary);
+ long fileLength = objectLengthPathPair.getLeft();
+ String relativePath = objectLengthPathPair.getRight();
+ int actualReadSize =
+ ObjectTypeUtils.getActualReadSize(
+ relativePath, fileLength, offset, length < 0 ? fileLength :
length);
+ return ObjectTypeUtils.readObjectContent(
+ relativePath, offset, actualReadSize, mayNotInCurrentNode);
+ }
+
+ public static ByteBuffer readObjectContent(
+ String relativePath, long offset, int readSize, boolean
mayNotInCurrentNode) {
+ Optional<File> objectFile =
TIER_MANAGER.getAbsoluteObjectFilePath(relativePath, false);
+ if (objectFile.isPresent()) {
+ return readObjectContentFromLocalFile(objectFile.get(), offset,
readSize);
+ }
+ if (mayNotInCurrentNode) {
+ return readObjectContentFromRemoteFile(relativePath, offset, readSize);
+ }
+ throw new ObjectFileNotExist(relativePath);
+ }
+
+ private static ByteBuffer readObjectContentFromLocalFile(File file, long
offset, long readSize) {
+ byte[] bytes = new byte[(int) readSize];
+ ByteBuffer buffer = ByteBuffer.wrap(bytes);
+ try (FileChannel fileChannel = FileChannel.open(file.toPath(),
StandardOpenOption.READ)) {
+ fileChannel.read(buffer, offset);
+ } catch (IOException e) {
+ throw new IoTDBRuntimeException(e,
TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
+ }
+ buffer.flip();
+ return buffer;
+ }
+
+ private static ByteBuffer readObjectContentFromRemoteFile(
+ final String relativePath, final long offset, final int readSize) {
+ ByteBuffer buffer = ByteBuffer.allocate(readSize);
+ TConsensusGroupId consensusGroupId =
+ new TConsensusGroupId(
+ TConsensusGroupType.DataRegion,
+ Integer.parseInt(Paths.get(relativePath).getName(0).toString()));
+ List<TRegionReplicaSet> regionReplicaSetList =
+ ClusterPartitionFetcher.getInstance()
+ .getRegionReplicaSet(Collections.singletonList(consensusGroupId));
+ TRegionReplicaSet regionReplicaSet =
regionReplicaSetList.iterator().next();
+ final int batchSize = 1024 * 1024;
+ final TReadObjectReq req = new TReadObjectReq();
+ req.setRelativePath(relativePath);
+ for (int i = 0; i < regionReplicaSet.getDataNodeLocations().size(); i++) {
+ TDataNodeLocation dataNodeLocation =
regionReplicaSet.getDataNodeLocations().get(i);
+ int toReadSizeInCurrentDataNode = readSize;
+ try (SyncDataNodeInternalServiceClient client =
+ Coordinator.getInstance()
+ .getInternalServiceClientManager()
+ .borrowClient(dataNodeLocation.getInternalEndPoint())) {
+ while (toReadSizeInCurrentDataNode > 0) {
+ req.setOffset(offset + buffer.position());
+ req.setSize(Math.min(toReadSizeInCurrentDataNode, batchSize));
+ toReadSizeInCurrentDataNode -= req.getSize();
+ ByteBuffer partial = client.readObject(req);
+ buffer.put(partial);
+ }
+ } catch (Exception e) {
+ logger.error("Failed to read object from datanode: {}",
dataNodeLocation, e);
Review Comment:
```suggestion
logger.warn("Failed to read object from datanode: {}",
dataNodeLocation, e);
```
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ObjectTypeUtils.java:
##########
@@ -40,6 +58,117 @@ public class ObjectTypeUtils {
private ObjectTypeUtils() {}
+ public static ByteBuffer readObjectContent(
+ Binary binary, long offset, int length, boolean mayNotInCurrentNode) {
+ Pair<Long, String> objectLengthPathPair =
ObjectTypeUtils.parseObjectBinary(binary);
+ long fileLength = objectLengthPathPair.getLeft();
+ String relativePath = objectLengthPathPair.getRight();
+ int actualReadSize =
+ ObjectTypeUtils.getActualReadSize(
+ relativePath, fileLength, offset, length < 0 ? fileLength :
length);
+ return ObjectTypeUtils.readObjectContent(
+ relativePath, offset, actualReadSize, mayNotInCurrentNode);
+ }
+
+ public static ByteBuffer readObjectContent(
+ String relativePath, long offset, int readSize, boolean
mayNotInCurrentNode) {
+ Optional<File> objectFile =
TIER_MANAGER.getAbsoluteObjectFilePath(relativePath, false);
+ if (objectFile.isPresent()) {
+ return readObjectContentFromLocalFile(objectFile.get(), offset,
readSize);
+ }
+ if (mayNotInCurrentNode) {
+ return readObjectContentFromRemoteFile(relativePath, offset, readSize);
+ }
+ throw new ObjectFileNotExist(relativePath);
+ }
+
+ private static ByteBuffer readObjectContentFromLocalFile(File file, long
offset, long readSize) {
+ byte[] bytes = new byte[(int) readSize];
+ ByteBuffer buffer = ByteBuffer.wrap(bytes);
+ try (FileChannel fileChannel = FileChannel.open(file.toPath(),
StandardOpenOption.READ)) {
+ fileChannel.read(buffer, offset);
+ } catch (IOException e) {
+ throw new IoTDBRuntimeException(e,
TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
+ }
+ buffer.flip();
+ return buffer;
+ }
+
+ private static ByteBuffer readObjectContentFromRemoteFile(
+ final String relativePath, final long offset, final int readSize) {
+ ByteBuffer buffer = ByteBuffer.allocate(readSize);
+ TConsensusGroupId consensusGroupId =
+ new TConsensusGroupId(
+ TConsensusGroupType.DataRegion,
+ Integer.parseInt(Paths.get(relativePath).getName(0).toString()));
+ List<TRegionReplicaSet> regionReplicaSetList =
+ ClusterPartitionFetcher.getInstance()
+ .getRegionReplicaSet(Collections.singletonList(consensusGroupId));
+ TRegionReplicaSet regionReplicaSet =
regionReplicaSetList.iterator().next();
+ final int batchSize = 1024 * 1024;
+ final TReadObjectReq req = new TReadObjectReq();
+ req.setRelativePath(relativePath);
+ for (int i = 0; i < regionReplicaSet.getDataNodeLocations().size(); i++) {
+ TDataNodeLocation dataNodeLocation =
regionReplicaSet.getDataNodeLocations().get(i);
+ int toReadSizeInCurrentDataNode = readSize;
+ try (SyncDataNodeInternalServiceClient client =
+ Coordinator.getInstance()
+ .getInternalServiceClientManager()
+ .borrowClient(dataNodeLocation.getInternalEndPoint())) {
+ while (toReadSizeInCurrentDataNode > 0) {
+ req.setOffset(offset + buffer.position());
+ req.setSize(Math.min(toReadSizeInCurrentDataNode, batchSize));
+ toReadSizeInCurrentDataNode -= req.getSize();
+ ByteBuffer partial = client.readObject(req);
+ buffer.put(partial);
+ }
+ } catch (Exception e) {
+ logger.error("Failed to read object from datanode: {}",
dataNodeLocation, e);
+ if (i == regionReplicaSet.getDataNodeLocations().size() - 1) {
+ throw new IoTDBRuntimeException(e,
TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
+ }
+ buffer.clear();
+ req.setOffset(offset);
Review Comment:
if you want to read from start, req.setOffset(offset) is also needless.
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ObjectTypeUtils.java:
##########
@@ -40,6 +58,117 @@ public class ObjectTypeUtils {
private ObjectTypeUtils() {}
+ public static ByteBuffer readObjectContent(
+ Binary binary, long offset, int length, boolean mayNotInCurrentNode) {
+ Pair<Long, String> objectLengthPathPair =
ObjectTypeUtils.parseObjectBinary(binary);
+ long fileLength = objectLengthPathPair.getLeft();
+ String relativePath = objectLengthPathPair.getRight();
+ int actualReadSize =
+ ObjectTypeUtils.getActualReadSize(
+ relativePath, fileLength, offset, length < 0 ? fileLength :
length);
+ return ObjectTypeUtils.readObjectContent(
+ relativePath, offset, actualReadSize, mayNotInCurrentNode);
+ }
+
+ public static ByteBuffer readObjectContent(
+ String relativePath, long offset, int readSize, boolean
mayNotInCurrentNode) {
+ Optional<File> objectFile =
TIER_MANAGER.getAbsoluteObjectFilePath(relativePath, false);
+ if (objectFile.isPresent()) {
+ return readObjectContentFromLocalFile(objectFile.get(), offset,
readSize);
+ }
+ if (mayNotInCurrentNode) {
+ return readObjectContentFromRemoteFile(relativePath, offset, readSize);
+ }
+ throw new ObjectFileNotExist(relativePath);
+ }
+
+ private static ByteBuffer readObjectContentFromLocalFile(File file, long
offset, long readSize) {
+ byte[] bytes = new byte[(int) readSize];
+ ByteBuffer buffer = ByteBuffer.wrap(bytes);
+ try (FileChannel fileChannel = FileChannel.open(file.toPath(),
StandardOpenOption.READ)) {
+ fileChannel.read(buffer, offset);
+ } catch (IOException e) {
+ throw new IoTDBRuntimeException(e,
TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
+ }
+ buffer.flip();
+ return buffer;
+ }
+
+ private static ByteBuffer readObjectContentFromRemoteFile(
+ final String relativePath, final long offset, final int readSize) {
+ ByteBuffer buffer = ByteBuffer.allocate(readSize);
+ TConsensusGroupId consensusGroupId =
+ new TConsensusGroupId(
+ TConsensusGroupType.DataRegion,
+ Integer.parseInt(Paths.get(relativePath).getName(0).toString()));
+ List<TRegionReplicaSet> regionReplicaSetList =
+ ClusterPartitionFetcher.getInstance()
+ .getRegionReplicaSet(Collections.singletonList(consensusGroupId));
+ TRegionReplicaSet regionReplicaSet =
regionReplicaSetList.iterator().next();
+ final int batchSize = 1024 * 1024;
+ final TReadObjectReq req = new TReadObjectReq();
+ req.setRelativePath(relativePath);
+ for (int i = 0; i < regionReplicaSet.getDataNodeLocations().size(); i++) {
+ TDataNodeLocation dataNodeLocation =
regionReplicaSet.getDataNodeLocations().get(i);
+ int toReadSizeInCurrentDataNode = readSize;
+ try (SyncDataNodeInternalServiceClient client =
+ Coordinator.getInstance()
+ .getInternalServiceClientManager()
+ .borrowClient(dataNodeLocation.getInternalEndPoint())) {
+ while (toReadSizeInCurrentDataNode > 0) {
+ req.setOffset(offset + buffer.position());
+ req.setSize(Math.min(toReadSizeInCurrentDataNode, batchSize));
+ toReadSizeInCurrentDataNode -= req.getSize();
+ ByteBuffer partial = client.readObject(req);
+ buffer.put(partial);
+ }
+ } catch (Exception e) {
+ logger.error("Failed to read object from datanode: {}",
dataNodeLocation, e);
+ if (i == regionReplicaSet.getDataNodeLocations().size() - 1) {
+ throw new IoTDBRuntimeException(e,
TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
+ }
+ buffer.clear();
+ req.setOffset(offset);
Review Comment:
We can continue from the last break point?
##########
integration-test/src/test/java/org/apache/iotdb/relational/it/query/recent/IoTDBObjectTypeQueryIT.java:
##########
@@ -0,0 +1,295 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.relational.it.query.recent;
+
+import org.apache.iotdb.isession.ITableSession;
+import org.apache.iotdb.isession.SessionDataSet;
+import org.apache.iotdb.it.env.EnvFactory;
+import org.apache.iotdb.it.framework.IoTDBTestRunner;
+import org.apache.iotdb.itbase.category.TableClusterIT;
+import org.apache.iotdb.itbase.category.TableLocalStandaloneIT;
+import org.apache.iotdb.itbase.env.BaseEnv;
+import org.apache.iotdb.rpc.IoTDBConnectionException;
+import org.apache.iotdb.rpc.StatementExecutionException;
+
+import org.apache.tsfile.utils.Binary;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+
+import java.sql.Connection;
+import java.sql.Statement;
+import java.time.LocalDate;
+
+@RunWith(IoTDBTestRunner.class)
+@Category({TableLocalStandaloneIT.class, TableClusterIT.class})
+public class IoTDBObjectTypeQueryIT {
Review Comment:
move this class to
integration-test/src/test/java/org/apache/iotdb/relational/it/query/object
package
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ObjectTypeUtils.java:
##########
@@ -40,6 +58,117 @@ public class ObjectTypeUtils {
private ObjectTypeUtils() {}
+ public static ByteBuffer readObjectContent(
+ Binary binary, long offset, int length, boolean mayNotInCurrentNode) {
+ Pair<Long, String> objectLengthPathPair =
ObjectTypeUtils.parseObjectBinary(binary);
+ long fileLength = objectLengthPathPair.getLeft();
+ String relativePath = objectLengthPathPair.getRight();
+ int actualReadSize =
+ ObjectTypeUtils.getActualReadSize(
+ relativePath, fileLength, offset, length < 0 ? fileLength :
length);
+ return ObjectTypeUtils.readObjectContent(
+ relativePath, offset, actualReadSize, mayNotInCurrentNode);
+ }
+
+ public static ByteBuffer readObjectContent(
+ String relativePath, long offset, int readSize, boolean
mayNotInCurrentNode) {
+ Optional<File> objectFile =
TIER_MANAGER.getAbsoluteObjectFilePath(relativePath, false);
+ if (objectFile.isPresent()) {
+ return readObjectContentFromLocalFile(objectFile.get(), offset,
readSize);
+ }
+ if (mayNotInCurrentNode) {
+ return readObjectContentFromRemoteFile(relativePath, offset, readSize);
+ }
+ throw new ObjectFileNotExist(relativePath);
+ }
+
+ private static ByteBuffer readObjectContentFromLocalFile(File file, long
offset, long readSize) {
+ byte[] bytes = new byte[(int) readSize];
+ ByteBuffer buffer = ByteBuffer.wrap(bytes);
+ try (FileChannel fileChannel = FileChannel.open(file.toPath(),
StandardOpenOption.READ)) {
+ fileChannel.read(buffer, offset);
+ } catch (IOException e) {
+ throw new IoTDBRuntimeException(e,
TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
+ }
+ buffer.flip();
+ return buffer;
+ }
+
+ private static ByteBuffer readObjectContentFromRemoteFile(
+ final String relativePath, final long offset, final int readSize) {
+ ByteBuffer buffer = ByteBuffer.allocate(readSize);
+ TConsensusGroupId consensusGroupId =
+ new TConsensusGroupId(
+ TConsensusGroupType.DataRegion,
+ Integer.parseInt(Paths.get(relativePath).getName(0).toString()));
+ List<TRegionReplicaSet> regionReplicaSetList =
+ ClusterPartitionFetcher.getInstance()
+ .getRegionReplicaSet(Collections.singletonList(consensusGroupId));
+ TRegionReplicaSet regionReplicaSet =
regionReplicaSetList.iterator().next();
+ final int batchSize = 1024 * 1024;
+ final TReadObjectReq req = new TReadObjectReq();
+ req.setRelativePath(relativePath);
+ for (int i = 0; i < regionReplicaSet.getDataNodeLocations().size(); i++) {
+ TDataNodeLocation dataNodeLocation =
regionReplicaSet.getDataNodeLocations().get(i);
+ int toReadSizeInCurrentDataNode = readSize;
+ try (SyncDataNodeInternalServiceClient client =
+ Coordinator.getInstance()
+ .getInternalServiceClientManager()
+ .borrowClient(dataNodeLocation.getInternalEndPoint())) {
+ while (toReadSizeInCurrentDataNode > 0) {
+ req.setOffset(offset + buffer.position());
+ req.setSize(Math.min(toReadSizeInCurrentDataNode, batchSize));
+ toReadSizeInCurrentDataNode -= req.getSize();
+ ByteBuffer partial = client.readObject(req);
+ buffer.put(partial);
+ }
+ } catch (Exception e) {
+ logger.error("Failed to read object from datanode: {}",
dataNodeLocation, e);
+ if (i == regionReplicaSet.getDataNodeLocations().size() - 1) {
+ throw new IoTDBRuntimeException(e,
TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
Review Comment:
We've already record the real thread exception in warn log, here we only
need a new error code and a error msg to notify users that this problem.
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/column/unary/scalar/ReadObjectColumnTransformer.java:
##########
@@ -107,35 +100,14 @@ private void transform(Column column, ColumnBuilder
columnBuilder, int i) {
}
private Binary readObject(Binary binary) {
- File file = ObjectTypeUtils.getObjectPathFromBinary(binary);
- long actualReadSize = getActualReadSize(file);
+ Pair<Long, String> ObjectLengthPathPair =
ObjectTypeUtils.parseObjectBinary(binary);
Review Comment:
```suggestion
Pair<Long, String> objectLengthPathPair =
ObjectTypeUtils.parseObjectBinary(binary);
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]