sashapolo commented on a change in pull request #402:
URL: https://github.com/apache/ignite-3/pull/402#discussion_r731674947
##########
File path:
examples/src/test/java/org/apache/ignite/example/sql/jdbc/SqlExamplesTest.java
##########
@@ -61,12 +64,7 @@ public void testSqlJdbcExample() throws Exception {
}
@BeforeEach
- private void startNode() throws IOException {
- Path workDir = Path.of("my-first-node-work");
-
- if (Files.exists(workDir))
- IgniteUtils.deleteIfExists(workDir);
-
+ private void startNode(@WorkDirectory Path workDir) throws IOException {
Review comment:
`@BeforeEach` methods must not be private. Same applies to `@AfterEach`
##########
File path:
modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbPartitionStorage.java
##########
@@ -83,6 +83,15 @@ public RocksDbPartitionStorage(RocksDB db, ColumnFamily
columnFamily) throws Sto
this.data = columnFamily;
}
+ /**
+ * Returns ColumnFamily instance associated with the partition.
Review comment:
```suggestion
* Returns the ColumnFamily instance associated with the partition.
```
##########
File path:
modules/storage-api/src/main/java/org/apache/ignite/internal/storage/engine/TableStorage.java
##########
@@ -25,12 +26,47 @@
*/
public interface TableStorage {
/**
- * Gets or creates a partition for current table.
+ * Retrieves or creates a partition for current table. Not expected to be
called concurrently with the same
+ * partition id.
*
* @param partId Partition id.
* @return Partition storage.
+ * @throws IllegalArgumentException If partition id is invalid.
+ * @throws StorageException If error occurred during partition creation.
Review comment:
```suggestion
* @throws StorageException If an error has occurred during the
partition creation.
```
##########
File path:
modules/storage-api/src/main/java/org/apache/ignite/internal/storage/engine/TableStorage.java
##########
@@ -45,4 +81,11 @@
* @throws StorageException If something went wrong.
*/
void stop() throws StorageException;
+
+ /**
+ * Stops and destroys the storage and cleans all allocated resources.
+ *
+ * @throws StorageException If something went wrong.
Review comment:
This doesn't look good =)
##########
File path:
modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorage.java
##########
@@ -199,16 +209,19 @@ else if (handleName.startsWith(CF_PARTITION_PREFIX)) {
}
/** {@inheritDoc} */
- @Override public PartitionStorage getOrCreatePartition(int partId) {
- assert partId < partitions : S.toString(
- "Attempt to create partition with id outside of configured range",
- "partitionId", partId, false,
- "partitions", partitions, false
- );
+ @Override public void destroy() throws StorageException {
+ stop();
- ColumnFamily partitionCf = partitionCfs.get(partId);
+ IgniteUtils.deleteIfExists(tablePath);
Review comment:
Maybe the `TableStorage` should also create the `tablePath` since it
deletes it?
##########
File path:
modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorage.java
##########
@@ -96,10 +98,10 @@
private ColumnFamilyHandle metaCfHandle;
/** Column families for partitions. Stored as an array for the quick
access by an index. */
- private AtomicReferenceArray<ColumnFamily> partitionCfs;
+ private AtomicReferenceArray<RocksDbPartitionStorage> partitions;
/** Max number of partitions in the table. */
- private int partitions;
+ private int maxPartitions;
/** Column families for indexes by their names. */
private Map<String, ColumnFamilyHandle> indicesCfHandles = new
ConcurrentHashMap<>();
Review comment:
```suggestion
private final Map<String, ColumnFamilyHandle> indicesCfHandles = new
ConcurrentHashMap<>();
```
##########
File path:
modules/storage-api/src/main/java/org/apache/ignite/internal/storage/engine/TableStorage.java
##########
@@ -25,12 +26,47 @@
*/
public interface TableStorage {
/**
- * Gets or creates a partition for current table.
+ * Retrieves or creates a partition for current table. Not expected to be
called concurrently with the same
+ * partition id.
*
* @param partId Partition id.
* @return Partition storage.
+ * @throws IllegalArgumentException If partition id is invalid.
+ * @throws StorageException If error occurred during partition creation.
*/
- PartitionStorage getOrCreatePartition(int partId);
+ PartitionStorage getOrCreatePartition(int partId) throws StorageException;
+
+ /**
+ * Returns partition storage or {@code null} if required storage doesn't
exist.
Review comment:
```suggestion
* Returns the partition storage or {@code null} if the requested
storage doesn't exist.
```
##########
File path:
modules/core/src/main/java/org/apache/ignite/internal/util/CollectionUtils.java
##########
@@ -193,6 +198,45 @@ else if (mapper == null)
}
}
+ /**
+ * Converts iterator to stream.
+ *
+ * @param iterator Iterator.
+ * @param <T> Type of elements in iterator.
+ * @return Stream.
+ */
+ public static <T> Stream<T> iteratorToStream(Iterator<T> iterator) {
Review comment:
I think that this method can simply be called `stream` or `toStream`
##########
File path:
modules/storage-api/src/main/java/org/apache/ignite/internal/storage/engine/TableStorage.java
##########
@@ -25,12 +26,47 @@
*/
public interface TableStorage {
/**
- * Gets or creates a partition for current table.
+ * Retrieves or creates a partition for current table. Not expected to be
called concurrently with the same
+ * partition id.
*
* @param partId Partition id.
* @return Partition storage.
+ * @throws IllegalArgumentException If partition id is invalid.
+ * @throws StorageException If error occurred during partition creation.
*/
- PartitionStorage getOrCreatePartition(int partId);
+ PartitionStorage getOrCreatePartition(int partId) throws StorageException;
+
+ /**
+ * Returns partition storage or {@code null} if required storage doesn't
exist.
+ *
+ * @param partId Partition id.
+ * @return Partition storage or {@code null}.
+ * @throws IllegalArgumentException If partition id is invalid.
+ */
+ PartitionStorage getPartition(int partId);
+
+ /**
+ * Destroys partition if it exists.
Review comment:
```suggestion
* Destroys a partition if it exists.
```
##########
File path:
modules/storage-api/src/main/java/org/apache/ignite/internal/storage/engine/TableStorage.java
##########
@@ -25,12 +26,47 @@
*/
public interface TableStorage {
/**
- * Gets or creates a partition for current table.
+ * Retrieves or creates a partition for current table. Not expected to be
called concurrently with the same
Review comment:
```suggestion
* Retrieves or creates a partition for the current table. Not expected
to be called concurrently with the same
```
##########
File path:
modules/storage-api/src/main/java/org/apache/ignite/internal/storage/engine/TableStorage.java
##########
@@ -25,12 +26,47 @@
*/
public interface TableStorage {
/**
- * Gets or creates a partition for current table.
+ * Retrieves or creates a partition for current table. Not expected to be
called concurrently with the same
+ * partition id.
*
* @param partId Partition id.
* @return Partition storage.
+ * @throws IllegalArgumentException If partition id is invalid.
+ * @throws StorageException If error occurred during partition creation.
*/
- PartitionStorage getOrCreatePartition(int partId);
+ PartitionStorage getOrCreatePartition(int partId) throws StorageException;
+
+ /**
+ * Returns partition storage or {@code null} if required storage doesn't
exist.
+ *
+ * @param partId Partition id.
+ * @return Partition storage or {@code null}.
+ * @throws IllegalArgumentException If partition id is invalid.
+ */
+ PartitionStorage getPartition(int partId);
Review comment:
I guess this method should be annotated with `@Nullable`
##########
File path:
modules/core/src/main/java/org/apache/ignite/internal/util/CollectionUtils.java
##########
@@ -193,6 +198,45 @@ else if (mapper == null)
}
}
+ /**
+ * Converts iterator to stream.
+ *
+ * @param iterator Iterator.
+ * @param <T> Type of elements in iterator.
+ * @return Stream.
+ */
+ public static <T> Stream<T> iteratorToStream(Iterator<T> iterator) {
+ return stream(spliteratorUnknownSize(iterator, Spliterator.ORDERED),
false);
+ }
+
+ /**
+ * Returns iterator that iterates list in reverse order.
+ *
+ * @param list List.
+ * @param <T> Type of elements in list.
+ * @return Reverse iterator.
+ */
+ public static <T> Iterator<T> reverseIterator(List<? extends T> list) {
+ return new Iterator<T>() {
+ private final ListIterator<? extends T> iter =
list.listIterator(list.size());
+
+ /** {@inheritDoc} */
+ @Override public boolean hasNext() {
+ return iter.hasPrevious();
+ }
+
+ /** {@inheritDoc} */
+ @Override public T next() {
+ return iter.previous();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void remove() {
+ iter.remove();
Review comment:
Are you sure this is correct? What if I will create an iterator and
immediately call `remove` on it?
##########
File path:
modules/storage-api/src/main/java/org/apache/ignite/internal/storage/engine/TableStorage.java
##########
@@ -25,12 +26,47 @@
*/
public interface TableStorage {
/**
- * Gets or creates a partition for current table.
+ * Retrieves or creates a partition for current table. Not expected to be
called concurrently with the same
+ * partition id.
*
* @param partId Partition id.
* @return Partition storage.
+ * @throws IllegalArgumentException If partition id is invalid.
+ * @throws StorageException If error occurred during partition creation.
*/
- PartitionStorage getOrCreatePartition(int partId);
+ PartitionStorage getOrCreatePartition(int partId) throws StorageException;
+
+ /**
+ * Returns partition storage or {@code null} if required storage doesn't
exist.
+ *
+ * @param partId Partition id.
+ * @return Partition storage or {@code null}.
+ * @throws IllegalArgumentException If partition id is invalid.
+ */
+ PartitionStorage getPartition(int partId);
+
+ /**
+ * Destroys partition if it exists.
+ *
+ * @param partId Partition id.
+ * @throws IllegalArgumentException If partition id is invalid.
+ * @throws StorageException If error occurred during partition destruction.
+ */
+ void dropPartition(int partId) throws StorageException;
+
+ /**
+ * Returns table configuration.
Review comment:
```suggestion
* Returns the table configuration.
```
##########
File path:
modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorage.java
##########
@@ -142,17 +154,17 @@ public RocksDbTableStorage(
.setWriteBufferManager(dataRegion.writeBufferManager())
);
+ maxPartitions = tableCfg.value().partitions();
+
+ partitions = new AtomicReferenceArray<>(maxPartitions);
+
try {
db = addToCloseableResources(RocksDB.open(dbOptions,
tablePath.toAbsolutePath().toString(), cfDescriptors, cfHandles));
}
catch (RocksDBException e) {
- throw new StorageException("Failed to initialize RocksDB
instance.", e);
+ throw new StorageException("Failed to initialize RocksDB instance.
" + System.currentTimeMillis(), e);
Review comment:
What is this for?
##########
File path:
modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorage.java
##########
@@ -96,10 +98,10 @@
private ColumnFamilyHandle metaCfHandle;
/** Column families for partitions. Stored as an array for the quick
access by an index. */
- private AtomicReferenceArray<ColumnFamily> partitionCfs;
+ private AtomicReferenceArray<RocksDbPartitionStorage> partitions;
/** Max number of partitions in the table. */
- private int partitions;
+ private int maxPartitions;
Review comment:
Do we actually need this variable? Can it be replaced with
`partitions.length()`?
##########
File path:
modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorage.java
##########
@@ -219,18 +232,65 @@ else if (handleName.startsWith(CF_PARTITION_PREFIX)) {
try {
ColumnFamilyHandle cfHandle =
db.createColumnFamily(cfDescriptor);
- partitionCf = new ColumnFamily(db, cfHandle, handleName,
cfDescriptor.getOptions(), null);
+ ColumnFamily cf = new ColumnFamily(db, cfHandle, handleName,
cfDescriptor.getOptions(), null);
+
+ partition = new RocksDbPartitionStorage(db, cf);
}
catch (RocksDBException e) {
cfDescriptor.getOptions().close();
throw new StorageException("Failed to create new RocksDB
column family " + handleName, e);
}
- partitionCfs.set(partId, partitionCf);
+ partitions.set(partId, partition);
+ }
+
+ return partition;
+ }
+
+ /** {@inheritDoc} */
+ @Override public PartitionStorage getPartition(int partId) {
+ checkPartitionId(partId);
+
+ return partitions.get(partId);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void dropPartition(int partId) throws StorageException {
+ checkPartitionId(partId);
+
+ RocksDbPartitionStorage partition = partitions.get(partId);
+
+ if (partition != null) {
+ partitions.set(partId, null);
+
+ ColumnFamily cf = partition.columnFamily();
+
+ ColumnFamilyHandle cfHandle = cf.handle();
+
+ try {
+ db.dropColumnFamily(cfHandle);
Review comment:
should we also call `destroyColumnFamilyHandle`?
```
One interesting thing: Even if ColumnFamilyHandle is pointing to a dropped
Column Family, you can continue using it. The data is actually deleted only
after you delete all outstanding ColumnFamilyHandles.
```
##########
File path:
modules/storage-api/src/main/java/org/apache/ignite/internal/storage/engine/TableStorage.java
##########
@@ -25,12 +26,47 @@
*/
public interface TableStorage {
/**
- * Gets or creates a partition for current table.
+ * Retrieves or creates a partition for current table. Not expected to be
called concurrently with the same
+ * partition id.
*
* @param partId Partition id.
* @return Partition storage.
+ * @throws IllegalArgumentException If partition id is invalid.
+ * @throws StorageException If error occurred during partition creation.
*/
- PartitionStorage getOrCreatePartition(int partId);
+ PartitionStorage getOrCreatePartition(int partId) throws StorageException;
+
+ /**
+ * Returns partition storage or {@code null} if required storage doesn't
exist.
+ *
+ * @param partId Partition id.
+ * @return Partition storage or {@code null}.
+ * @throws IllegalArgumentException If partition id is invalid.
+ */
+ PartitionStorage getPartition(int partId);
+
+ /**
+ * Destroys partition if it exists.
+ *
+ * @param partId Partition id.
+ * @throws IllegalArgumentException If partition id is invalid.
+ * @throws StorageException If error occurred during partition destruction.
+ */
+ void dropPartition(int partId) throws StorageException;
+
+ /**
+ * Returns table configuration.
+ *
+ * @return Table configuration.
+ */
+ TableConfiguration configuration();
+
+ /**
+ * Returns data region containing tables data.
Review comment:
```suggestion
* Returns the data region containing table's data.
```
##########
File path:
modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorage.java
##########
@@ -219,18 +232,65 @@ else if (handleName.startsWith(CF_PARTITION_PREFIX)) {
try {
ColumnFamilyHandle cfHandle =
db.createColumnFamily(cfDescriptor);
- partitionCf = new ColumnFamily(db, cfHandle, handleName,
cfDescriptor.getOptions(), null);
+ ColumnFamily cf = new ColumnFamily(db, cfHandle, handleName,
cfDescriptor.getOptions(), null);
+
+ partition = new RocksDbPartitionStorage(db, cf);
}
catch (RocksDBException e) {
cfDescriptor.getOptions().close();
throw new StorageException("Failed to create new RocksDB
column family " + handleName, e);
}
- partitionCfs.set(partId, partitionCf);
+ partitions.set(partId, partition);
+ }
+
+ return partition;
+ }
+
+ /** {@inheritDoc} */
+ @Override public PartitionStorage getPartition(int partId) {
+ checkPartitionId(partId);
+
+ return partitions.get(partId);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void dropPartition(int partId) throws StorageException {
+ checkPartitionId(partId);
+
+ RocksDbPartitionStorage partition = partitions.get(partId);
+
+ if (partition != null) {
+ partitions.set(partId, null);
+
+ ColumnFamily cf = partition.columnFamily();
+
+ ColumnFamilyHandle cfHandle = cf.handle();
+
+ try {
+ db.dropColumnFamily(cfHandle);
+ }
+ catch (RocksDBException e) {
+ throw new StorageException("Failed to srop partition", e);
Review comment:
```suggestion
throw new StorageException("Failed to stop partition", e);
```
##########
File path:
modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorage.java
##########
@@ -184,13 +198,9 @@ else if (handleName.startsWith(CF_PARTITION_PREFIX)) {
/** {@inheritDoc} */
@Override public void stop() throws StorageException {
try {
- List<AutoCloseable> copy = new ArrayList<>(autoCloseables);
-
- Collections.reverse(copy);
-
IgniteUtils.closeAll(concat(
Review comment:
As I said before, I think that simply adding everything into a list and
then reversing it (or using a reverse iterator) might be a better choice here.
This code is more complicated than it needs to be.
##########
File path:
modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorage.java
##########
@@ -219,18 +232,65 @@ else if (handleName.startsWith(CF_PARTITION_PREFIX)) {
try {
ColumnFamilyHandle cfHandle =
db.createColumnFamily(cfDescriptor);
- partitionCf = new ColumnFamily(db, cfHandle, handleName,
cfDescriptor.getOptions(), null);
+ ColumnFamily cf = new ColumnFamily(db, cfHandle, handleName,
cfDescriptor.getOptions(), null);
+
+ partition = new RocksDbPartitionStorage(db, cf);
}
catch (RocksDBException e) {
cfDescriptor.getOptions().close();
throw new StorageException("Failed to create new RocksDB
column family " + handleName, e);
}
- partitionCfs.set(partId, partitionCf);
+ partitions.set(partId, partition);
+ }
+
+ return partition;
+ }
+
+ /** {@inheritDoc} */
+ @Override public PartitionStorage getPartition(int partId) {
+ checkPartitionId(partId);
+
+ return partitions.get(partId);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void dropPartition(int partId) throws StorageException {
+ checkPartitionId(partId);
+
+ RocksDbPartitionStorage partition = partitions.get(partId);
+
+ if (partition != null) {
+ partitions.set(partId, null);
+
+ ColumnFamily cf = partition.columnFamily();
+
+ ColumnFamilyHandle cfHandle = cf.handle();
+
+ try {
+ db.dropColumnFamily(cfHandle);
+ }
+ catch (RocksDBException e) {
+ throw new StorageException("Failed to srop partition", e);
+ }
}
+ }
- return new RocksDbPartitionStorage(db, partitionCf);
+ /**
+ * Checks that passed partition id is within proper bounds.
Review comment:
```suggestion
* Checks that a passed partition id is within the proper bounds.
```
##########
File path:
modules/core/src/main/java/org/apache/ignite/internal/util/CollectionUtils.java
##########
@@ -193,6 +198,45 @@ else if (mapper == null)
}
}
+ /**
+ * Converts iterator to stream.
Review comment:
```suggestion
* Converts an iterator to a stream.
```
##########
File path:
examples/src/test/java/org/apache/ignite/example/table/TableExamplesTest.java
##########
@@ -64,12 +67,7 @@ public void testKeyValueViewExample() throws Exception {
}
@BeforeEach
- private void startNode() throws IOException {
- Path workDir = Path.of("my-first-node-work");
-
- if (Files.exists(workDir))
- IgniteUtils.deleteIfExists(workDir);
-
+ private void startNode(@WorkDirectory Path workDir) throws IOException {
Review comment:
same here
##########
File path:
modules/core/src/main/java/org/apache/ignite/internal/util/CollectionUtils.java
##########
@@ -193,6 +198,45 @@ else if (mapper == null)
}
}
+ /**
+ * Converts iterator to stream.
+ *
+ * @param iterator Iterator.
+ * @param <T> Type of elements in iterator.
+ * @return Stream.
+ */
+ public static <T> Stream<T> iteratorToStream(Iterator<T> iterator) {
+ return stream(spliteratorUnknownSize(iterator, Spliterator.ORDERED),
false);
+ }
+
+ /**
+ * Returns iterator that iterates list in reverse order.
Review comment:
```suggestion
* Returns an iterator that iterates a list in reverse order.
```
##########
File path:
modules/storage-api/src/main/java/org/apache/ignite/internal/storage/engine/TableStorage.java
##########
@@ -25,12 +26,47 @@
*/
public interface TableStorage {
/**
- * Gets or creates a partition for current table.
+ * Retrieves or creates a partition for current table. Not expected to be
called concurrently with the same
+ * partition id.
*
* @param partId Partition id.
* @return Partition storage.
+ * @throws IllegalArgumentException If partition id is invalid.
+ * @throws StorageException If error occurred during partition creation.
*/
- PartitionStorage getOrCreatePartition(int partId);
+ PartitionStorage getOrCreatePartition(int partId) throws StorageException;
+
+ /**
+ * Returns partition storage or {@code null} if required storage doesn't
exist.
+ *
+ * @param partId Partition id.
+ * @return Partition storage or {@code null}.
+ * @throws IllegalArgumentException If partition id is invalid.
Review comment:
What does `partition id is invalid` mean?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]