jixuan1989 commented on a change in pull request #1169:
URL: https://github.com/apache/incubator-iotdb/pull/1169#discussion_r426454615
##########
File path: server/src/main/antlr4/org/apache/iotdb/db/qp/strategy/SqlBase.g4
##########
@@ -74,7 +74,7 @@ statement
| SHOW DEVICES prefixPath? #showDevices
| COUNT TIMESERIES prefixPath? (GROUP BY LEVEL OPERATOR_EQ INT)?
#countTimeseries
| COUNT NODES prefixPath LEVEL OPERATOR_EQ INT #countNodes
- | LOAD CONFIGURATION #loadConfigurationStatement
+ | LOAD CONFIGURATION (MINUS GLOBAL)? #loadConfigurationStatement
Review comment:
any UT?
##########
File path:
server/src/main/java/org/apache/iotdb/db/auth/role/BasicRoleManager.java
##########
@@ -167,4 +168,21 @@ public void reset() {
rtlist.sort(null);
return rtlist;
}
+
+ @Override
+ public void replaceAllRoles(Map<String, Role> roles) throws AuthException {
+ synchronized (this) {
Review comment:
The same to userManager.
##########
File path:
server/src/main/java/org/apache/iotdb/db/auth/user/BasicUserManager.java
##########
@@ -310,4 +311,23 @@ public void setUserUseWaterMark(String username, boolean
useWaterMark) throws Au
throw new AuthException(e);
}
}
+
+
+ @Override
+ public void replaceAllUsers(Map<String, User> users) throws AuthException {
+ synchronized (this) {
Review comment:
use the `lock` field of this class. (writeLock)
##########
File path:
server/src/main/java/org/apache/iotdb/db/qp/physical/sys/AuthorPlan.java
##########
@@ -125,6 +130,58 @@ public AuthorPlan(AuthorOperator.AuthorType authorType,
String userName, String
}
}
+ public AuthorPlan(OperatorType operatorType) throws IOException {
+ super(false, operatorType);
+
setAuthorType(AuthorType.values()[transformOperatorTypeToAuthorType(operatorType)]);
+ }
+
+ private int transformOperatorTypeToAuthorType(OperatorType operatorType)
Review comment:
if `transformOperatorTypeToAuthorType` is only used by
`setAuthorType(AuthorType.values()[transformOperatorTypeToAuthorType(operatorType)]);`,
why not set the returned type as AuthorType directly?
##########
File path:
server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
##########
@@ -1776,6 +1803,22 @@ public void loadNewTsFile(TsFileResource
newTsFileResource) throws LoadFileExcep
}
}
+ /**
+ * Set the version in "partition" to "version" if "version" is larger than
the current version.
+ * @param partition
+ * @param version
+ */
+ public void setPartitionFileVersionToMax(long partition, long version) {
Review comment:
Long partition, Long version
##########
File path:
server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
##########
@@ -523,6 +533,16 @@ private QueryDataSet
getQueryDataSet(List<ShowTimeSeriesResult> timeseriesList)
return listDataSet;
}
+ protected List<ShowTimeSeriesResult> showTimeseries(ShowTimeSeriesPlan plan)
+ throws MetadataException {
+ return MManager.getInstance().showTimeseries(plan);
+ }
+
+ protected List<ShowTimeSeriesResult>
showTimeseriesWithIndex(ShowTimeSeriesPlan plan)
Review comment:
What is the difference between `showTimeseries` and
`showTimeseriesWithIndex` ?
##########
File path:
server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
##########
@@ -864,43 +888,78 @@ public void delete(Path path, long timestamp) throws
QueryProcessException {
@Override
public void insert(InsertPlan insertPlan) throws QueryProcessException {
- MNode node = null;
try {
String[] measurementList = insertPlan.getMeasurements();
String deviceId = insertPlan.getDeviceId();
- node = mManager.getDeviceNodeWithAutoCreateAndReadLock(deviceId);
String[] strValues = insertPlan.getValues();
- MeasurementSchema[] schemas = new
MeasurementSchema[measurementList.length];
-
- for (int i = 0; i < measurementList.length; i++) {
- String measurement = measurementList[i];
- if (!node.hasChild(measurement)) {
- if
(!IoTDBDescriptor.getInstance().getConfig().isAutoCreateSchemaEnabled()) {
- throw new PathNotExistException(deviceId + PATH_SEPARATOR +
measurement);
- }
- TSDataType dataType =
TypeInferenceUtils.getPredictedDataType(strValues[i]);
- Path path = new Path(deviceId, measurement);
- internalCreateTimeseries(path.toString(), dataType);
- }
- LeafMNode measurementNode = (LeafMNode) node.getChild(measurement);
- schemas[i] = measurementNode.getSchema();
- // reset measurement to common name instead of alias
- measurementList[i] = measurementNode.getName();
- }
-
- insertPlan.setMeasurements(measurementList);
+ MeasurementSchema[] schemas = getSeriesSchemas(measurementList,
deviceId, strValues);
insertPlan.setSchemas(schemas);
StorageEngine.getInstance().insert(insertPlan);
} catch (StorageEngineException | MetadataException e) {
throw new QueryProcessException(e);
+ }
+ }
+
+ protected MeasurementSchema[] getSeriesSchemas(String[] measurementList,
String deviceId,
+ String[] strValues) throws MetadataException {
+ MeasurementSchema[] schemas = new
MeasurementSchema[measurementList.length];
+
+ MNode node = null;
+ try {
+ node = mManager.getDeviceNodeWithAutoCreateAndReadLock(deviceId);
Review comment:
For example, if this try-catch throws some RuntimeException (which
PathNotExistException can not catch), then the lock is not released..
##########
File path:
tsfile/src/main/java/org/apache/iotdb/tsfile/utils/ReadWriteIOUtils.java
##########
@@ -326,6 +326,15 @@ public static int write(ByteBuffer byteBuffer,
OutputStream outputStream) throws
return len;
}
+ /**
Review comment:
How about merge this class and `SerializeUtils.java `
##########
File path:
server/src/main/java/org/apache/iotdb/db/auth/user/BasicUserManager.java
##########
@@ -310,4 +311,23 @@ public void setUserUseWaterMark(String username, boolean
useWaterMark) throws Au
throw new AuthException(e);
}
}
+
+
+ @Override
+ public void replaceAllUsers(Map<String, User> users) throws AuthException {
+ synchronized (this) {
Review comment:
@neuyilan
##########
File path:
server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
##########
@@ -809,9 +808,10 @@ public void tryToUpdateInsertLastCache(InsertPlan plan,
Long latestFlushedTime)
for (int i = 0; i < measurementList.length; i++) {
// Update cached last value with high priority
MNode measurementNode = node.getChild(measurementList[i]);
-
- ((LeafMNode) measurementNode)
- .updateCachedLast(plan.composeTimeValuePair(i), true,
latestFlushedTime);
+ if (measurementNode != null) {
Review comment:
why check null here? is there a bug before?
##########
File path:
server/src/main/java/org/apache/iotdb/db/qp/strategy/LogicalGenerator.java
##########
@@ -201,7 +202,12 @@ public void enterRemoveFile(RemoveFileContext ctx) {
@Override
public void
enterLoadConfigurationStatement(LoadConfigurationStatementContext ctx) {
super.enterLoadConfigurationStatement(ctx);
- initializedOperator = new LoadConfigurationOperator();
+ if(ctx.GLOBAL()!=null){
Review comment:
format the code
##########
File path:
server/src/main/java/org/apache/iotdb/db/auth/user/BasicUserManager.java
##########
@@ -310,4 +311,23 @@ public void setUserUseWaterMark(String username, boolean
useWaterMark) throws Au
throw new AuthException(e);
}
}
+
+
+ @Override
+ public void replaceAllUsers(Map<String, User> users) throws AuthException {
+ synchronized (this) {
+ reset();
+ userMap = users;
Review comment:
Not sure whether `users` having "root" user.
If it has, then `initAdmin()` in `reset()` is meaningless.
If it does not have, then you have to initAdmin() after this replacement.
##########
File path:
server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
##########
@@ -571,4 +603,56 @@ public long getTimePartitionWithCheck() throws
PartitionViolationException {
}
return partitionId;
}
+
+ /**
+ * Create a hardlink for the TsFile and modification file (if exists)
+ * The hardlink with have a suffix like ".{sysTime}_{randomLong}"
+ * @return a new TsFileResource with its file changed to the hardlink or
null the hardlink
+ * cannot be created.
+ */
+ public TsFileResource createHardlink() {
+ if (!file.exists()) {
+ return null;
+ }
+
+ TsFileResource newResource;
+ try {
+ newResource = new TsFileResource(this);
+ } catch (IOException e) {
+ logger.error("Cannot create hardlink for {}", file, e);
+ return null;
+ }
+
+ while (true) {
+ String hardlinkSuffix = "." + System.currentTimeMillis() + "_" +
random.nextLong();
+ File hardlink = new File(file.getAbsolutePath() + hardlinkSuffix);
+
+ try {
+ Files.createLink(Paths.get(hardlink.getAbsolutePath()),
Paths.get(file.getAbsolutePath()));
+ newResource.setFile(hardlink);
+ if (modFile != null && modFile.exists()) {
+ newResource.setModFile(modFile.createHardlink());
+ }
+ break;
+ } catch (FileAlreadyExistsException e) {
+ // retry a different name if the file is already created
+ } catch (IOException e) {
+ logger.error("Cannot create hardlink for {}", file, e);
+ return null;
+ }
+ }
+ return newResource;
+ }
+
+ public synchronized void setModFile(ModificationFile modFile) {
+ this.modFile = modFile;
+ }
+
+ public long getMaxVersion() {
+ long maxVersion = 0;
+ if (historicalVersions != null) {
+ maxVersion = Collections.max(historicalVersions);
Review comment:
My another concern is, will the historicalVersion set be very large?
##########
File path:
server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
##########
@@ -241,6 +244,10 @@ public void serialize() throws IOException {
ReadWriteIOUtils.write(historicalVersion, outputStream);
}
}
+
+ if (modFile != null && modFile.exists()) {
+ ReadWriteIOUtils.write(modFile.getFilePath(), outputStream);
Review comment:
IMPORTANT.
don't use the absolute path in a tsfileResource. Otherwise it will be very
hard to help users solving online problems.
##########
File path:
server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
##########
@@ -2215,9 +2275,41 @@ public String getStorageGroupName() {
return storageGroupName;
}
+ /**
+ * Check if the data of "tsFileResource" all exist locally by comparing the
historical versions
+ * in the partition of "partitionNumber". This is available only when the
IoTDB which generated
+ * "tsFileResource" has the same close file policy as the local one.
+ * If one of the version in "tsFileResource" equals to a version of a
working file, false is
+ * also returned because "tsFileResource" may have unwritten data of that
file.
+ * @param tsFileResource
+ * @param partitionNum
+ * @return true if the historicalVersions of "tsFileResource" is a subset of
+ * partitionDirectFileVersions, or false if it is not a subset and it does
not contain any
+ * version of a working file
+ */
public boolean isFileAlreadyExist(TsFileResource tsFileResource, long
partitionNum) {
Review comment:
I think we can mark this function as ForCluster, then the parameter
`tsFileResource ` can be called `remoteTsFileResource`, and then the logic will
be much easier to be understood.
##########
File path: server/src/main/java/org/apache/iotdb/db/metadata/MManager.java
##########
@@ -1405,4 +1501,17 @@ public void collectSeries(String startingPath,
List<MeasurementSchema> timeserie
lock.readLock().unlock();
}
}
+
+ public void cacheSchema(String path, MeasurementSchema schema) {
Review comment:
hard to understand...
##########
File path: server/src/main/java/org/apache/iotdb/db/metadata/MManager.java
##########
@@ -565,6 +605,14 @@ public TSDataType getSeriesType(String path) throws
MetadataException {
if (path.equals(SQLConstant.RESERVED_TIME)) {
return TSDataType.INT64;
}
+
+ try {
+ MeasurementSchema schema = mRemoteSchemaCache.get(path);
Review comment:
How about if `mRemoteSchemaCache ` has no the path?
##########
File path:
server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
##########
@@ -2215,9 +2275,41 @@ public String getStorageGroupName() {
return storageGroupName;
}
+ /**
+ * Check if the data of "tsFileResource" all exist locally by comparing the
historical versions
+ * in the partition of "partitionNumber". This is available only when the
IoTDB which generated
+ * "tsFileResource" has the same close file policy as the local one.
+ * If one of the version in "tsFileResource" equals to a version of a
working file, false is
+ * also returned because "tsFileResource" may have unwritten data of that
file.
Review comment:
hard to understand the javadoc.
A tsFileResource is attached to a tsFile, what is the meaning of "when
IOTDB has the same close file policy as the local one".
##########
File path:
server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
##########
@@ -571,4 +603,56 @@ public long getTimePartitionWithCheck() throws
PartitionViolationException {
}
return partitionId;
}
+
+ /**
+ * Create a hardlink for the TsFile and modification file (if exists)
+ * The hardlink with have a suffix like ".{sysTime}_{randomLong}"
Review comment:
will have
##########
File path:
server/src/main/java/org/apache/iotdb/db/query/dataset/ShowTimeSeriesResult.java
##########
@@ -68,4 +79,70 @@ public String getCompressor() {
public Map<String, String> getTagAndAttribute() {
return tagAndAttribute;
}
+
+ @Override
+ public int compareTo(ShowTimeSeriesResult o) {
+ return this.name.compareTo(o.name);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ ShowTimeSeriesResult result = (ShowTimeSeriesResult) o;
+ return Objects.equals(name, result.name);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(name);
+ }
+
+ public void serialize(OutputStream outputStream) throws IOException {
+ ReadWriteIOUtils.write(name, outputStream);
+ ReadWriteIOUtils.write(alias != null, outputStream);
+ if (alias != null) {
+ ReadWriteIOUtils.write(alias, outputStream);
+ }
+ ReadWriteIOUtils.write(sgName, outputStream);
+ ReadWriteIOUtils.write(dataType, outputStream);
+ ReadWriteIOUtils.write(encoding, outputStream);
+ ReadWriteIOUtils.write(compressor, outputStream);
+
+ ReadWriteIOUtils.write(tagAndAttribute != null, outputStream);
+ if (tagAndAttribute != null) {
+ ReadWriteIOUtils.write(tagAndAttribute.size(), outputStream);
+ for (Entry<String, String> stringStringEntry :
tagAndAttribute.entrySet()) {
+ ReadWriteIOUtils.write(stringStringEntry.getKey(), outputStream);
+ ReadWriteIOUtils.write(stringStringEntry.getValue(), outputStream);
+ }
+ }
+ }
+
+ public static ShowTimeSeriesResult deserialize(ByteBuffer buffer) {
+ ShowTimeSeriesResult result = new ShowTimeSeriesResult();
+ result.name = ReadWriteIOUtils.readString(buffer);
+ if (buffer.get() == 1) {
+ result.alias = ReadWriteIOUtils.readString(buffer);
+ }
+ result.sgName = ReadWriteIOUtils.readString(buffer);
+ result.dataType = ReadWriteIOUtils.readString(buffer);
+ result.encoding = ReadWriteIOUtils.readString(buffer);
+ result.compressor = ReadWriteIOUtils.readString(buffer);
+
+ if (buffer.get() == 1) {
Review comment:
== true?
##########
File path: server/src/main/java/org/apache/iotdb/db/metadata/MManager.java
##########
@@ -813,12 +861,33 @@ private boolean match(String fullPath, String[]
prefixNodes) {
}
}
- public MeasurementSchema getSeriesSchema(String device, String measuremnet)
+ public MeasurementSchema getSeriesSchema(String device, String measurement)
throws MetadataException {
lock.readLock().lock();
try {
InternalMNode node = (InternalMNode) mtree.getNodeByPath(device);
- return ((LeafMNode) node.getChild(measuremnet)).getSchema();
+ MNode leaf = node.getChild(measurement);
+ if (leaf != null) {
+ return ((LeafMNode) leaf).getSchema();
+ } else {
+ return mRemoteSchemaCache
+ .get(device + IoTDBConstant.PATH_SEPARATOR + measurement);
+ }
+ } catch (PathNotExistException e) {
+ try {
+ MeasurementSchema measurementSchema = mRemoteSchemaCache
Review comment:
why retry?
##########
File path: service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
##########
@@ -63,10 +63,19 @@
READ_ONLY_SYSTEM_ERROR(502),
DISK_SPACE_INSUFFICIENT_ERROR(503),
START_UP_ERROR(504),
+
WRONG_LOGIN_PASSWORD_ERROR(600),
NOT_LOGIN_ERROR(601),
NO_PERMISSION_ERROR(602),
UNINITIALIZED_AUTH_ERROR(603),
+
+ // TODO-Cluster: update docs when ready to merge
+ PARTITION_NOT_READY(700),
+ TIME_OUT(701),
Review comment:
what for? maybe we need to separate the time_out between the single node
and cluster.
##########
File path:
server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
##########
@@ -864,43 +888,78 @@ public void delete(Path path, long timestamp) throws
QueryProcessException {
@Override
public void insert(InsertPlan insertPlan) throws QueryProcessException {
- MNode node = null;
try {
String[] measurementList = insertPlan.getMeasurements();
String deviceId = insertPlan.getDeviceId();
- node = mManager.getDeviceNodeWithAutoCreateAndReadLock(deviceId);
String[] strValues = insertPlan.getValues();
- MeasurementSchema[] schemas = new
MeasurementSchema[measurementList.length];
-
- for (int i = 0; i < measurementList.length; i++) {
- String measurement = measurementList[i];
- if (!node.hasChild(measurement)) {
- if
(!IoTDBDescriptor.getInstance().getConfig().isAutoCreateSchemaEnabled()) {
- throw new PathNotExistException(deviceId + PATH_SEPARATOR +
measurement);
- }
- TSDataType dataType =
TypeInferenceUtils.getPredictedDataType(strValues[i]);
- Path path = new Path(deviceId, measurement);
- internalCreateTimeseries(path.toString(), dataType);
- }
- LeafMNode measurementNode = (LeafMNode) node.getChild(measurement);
- schemas[i] = measurementNode.getSchema();
- // reset measurement to common name instead of alias
- measurementList[i] = measurementNode.getName();
- }
-
- insertPlan.setMeasurements(measurementList);
+ MeasurementSchema[] schemas = getSeriesSchemas(measurementList,
deviceId, strValues);
insertPlan.setSchemas(schemas);
StorageEngine.getInstance().insert(insertPlan);
} catch (StorageEngineException | MetadataException e) {
throw new QueryProcessException(e);
+ }
+ }
+
+ protected MeasurementSchema[] getSeriesSchemas(String[] measurementList,
String deviceId,
+ String[] strValues) throws MetadataException {
+ MeasurementSchema[] schemas = new
MeasurementSchema[measurementList.length];
+
+ MNode node = null;
+ try {
+ node = mManager.getDeviceNodeWithAutoCreateAndReadLock(deviceId);
Review comment:
why create such a method??
why not let the user call lock and unlock method explicitly. @JackieTien97
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]