sanpwc commented on a change in pull request #330:
URL: https://github.com/apache/ignite-3/pull/330#discussion_r710373143
##########
File path:
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
##########
@@ -173,63 +273,83 @@ public TableManager(
* @param name Table name.
* @param tblId Table id.
* @param assignment Affinity assignment.
- * @param schemaReg Schema registry for the table.
*/
private void createTableLocally(
String name,
- UUID tblId,
+ IgniteUuid tblId,
List<List<ClusterNode>> assignment,
- SchemaRegistry schemaReg
+ SchemaDescriptor schemaDesc
) {
int partitions = assignment.size();
var partitionsGroupsFutures = new
ArrayList<CompletableFuture<RaftGroupService>>();
- Path storageDir = partitionsStoreDir.resolve(name);
+ IntStream.range(0, partitions).forEach(p ->
+ partitionsGroupsFutures.add(
+ raftMgr.prepareRaftGroup(
+ raftGroupName(tblId, p),
+ assignment.get(p),
+ () -> {
+ Path storageDir = partitionsStoreDir.resolve(name);
+
+ try {
+ Files.createDirectories(storageDir);
+ }
+ catch (IOException e) {
+ throw new IgniteInternalException(
+ "Failed to create partitions store directory
for " + name + ": " + e.getMessage(),
+ e
+ );
+ }
+
+ return new PartitionListener(
+ new RocksDbStorage(
+ storageDir.resolve(String.valueOf(p)),
+ ByteBuffer::compareTo
+ )
+ );
+ }
+ )
+ )
+ );
- try {
- Files.createDirectories(storageDir);
- } catch (IOException e) {
- throw new IgniteInternalException(
- "Failed to create partitions store directory for " + name + ":
" + e.getMessage(),
- e
- );
- }
+
CompletableFuture.allOf(partitionsGroupsFutures.toArray(CompletableFuture[]::new)).thenRun(()
-> {
+ try {
+ HashMap<Integer, RaftGroupService> partitionMap = new
HashMap<>(partitions);
- for (int p = 0; p < partitions; p++) {
- RocksDbStorage storage = new RocksDbStorage(
- storageDir.resolve(String.valueOf(p)),
- ByteBuffer::compareTo
- );
-
- partitionsGroupsFutures.add(raftMgr.prepareRaftGroup(
- raftGroupName(tblId, p),
- assignment.get(p),
- new PartitionListener(storage)
- ));
- }
+ for (int p = 0; p < partitions; p++) {
+ CompletableFuture<RaftGroupService> future =
partitionsGroupsFutures.get(p);
-
CompletableFuture.allOf(partitionsGroupsFutures.toArray(CompletableFuture[]::new)).thenRun(()
-> {
- HashMap<Integer, RaftGroupService> partitionMap = new
HashMap<>(partitions);
+ assert future.isDone();
- for (int p = 0; p < partitions; p++) {
- CompletableFuture<RaftGroupService> future =
partitionsGroupsFutures.get(p);
+ RaftGroupService service = future.join();
- assert future.isDone();
+ partitionMap.put(p, service);
+ }
- RaftGroupService service = future.join();
+ InternalTableImpl internalTable = new InternalTableImpl(name,
tblId, partitionMap, partitions);
- partitionMap.put(p, service);
- }
+ var schemaRegistry = new SchemaRegistryImpl(v -> schemaDesc);
- InternalTableImpl internalTable = new InternalTableImpl(name,
tblId, partitionMap, partitions);
+ schemaRegistry.onSchemaRegistered(schemaDesc);
+
+ var table = new TableImpl(
+ internalTable,
+ schemaRegistry,
+ TableManager.this,
+ null
+ );
- var table = new TableImpl(internalTable, schemaReg, this, null);
+ tables.put(name, table);
+ tablesById.put(tblId, table);
- tables.put(name, table);
- tablesById.put(table.tableId(), table);
+ fireEvent(TableEvent.CREATE, new TableEventParameters(table),
null);
- onEvent(TableEvent.CREATE, new TableEventParameters(table), null);
+ Optional.ofNullable(createTblIntention.get(tblId)).ifPresent(f
-> f.complete(table));
+ }
+ catch (Exception e) {
Review comment:
Fixed.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]