korlov42 commented on code in PR #2848:
URL: https://github.com/apache/ignite-3/pull/2848#discussion_r1402241660
##########
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java:
##########
@@ -317,6 +341,69 @@ public CompletableFuture<ExecutionTarget>
forSystemView(ExecutionTargetFactory f
services.forEach(LifecycleAware::start);
}
+ /** Get primary replicas. */
+ private CompletableFuture<List<PrimaryReplica>> primaryReplicas(int
tableId) {
+ int catalogVersion = catalogManager.latestCatalogVersion();
+
+ Catalog catalog = catalogManager.catalog(catalogVersion);
+
+ CatalogTableDescriptor tblDesc =
Objects.requireNonNull(catalog.table(tableId), "table");
+
+ CatalogZoneDescriptor zoneDesc =
Objects.requireNonNull(catalog.zone(tblDesc.zoneId()), "zone");
+
+ int partitions = zoneDesc.partitions();
Review Comment:
does it make sense to put partition count into IgniteTable to avoid going to
catalog all the time?
##########
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java:
##########
@@ -858,10 +858,14 @@ private void enlist(int tableId, List<NodeWithTerm>
assignments) {
tx.assignCommitPartition(new TablePartitionId(tableId,
ThreadLocalRandom.current().nextInt(partsCnt)));
for (int p = 0; p < partsCnt; p++) {
- NodeWithTerm enlistmentToken = assignments.get(p);
+ TablePartitionId tablePartId = new
TablePartitionId(tableId, p);
- tx.enlist(new TablePartitionId(tableId, p),
- new
IgniteBiTuple<>(topSrvc.getByConsistentId(enlistmentToken.name()),
enlistmentToken.term()));
+ if (tx.enlistedNodeAndTerm(tablePartId) == null) {
Review Comment:
well, if transaction can't handle change of primary replica, then it must
fail. There is no point in condition like this one, because fragment was mapped
to particular node, and that node will be used to execute the fragment no
matter what. Thus, I believe, it's better to detect such changes in `enlist`
method and process it accordingly
##########
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java:
##########
@@ -317,6 +341,69 @@ public CompletableFuture<ExecutionTarget>
forSystemView(ExecutionTargetFactory f
services.forEach(LifecycleAware::start);
}
+ /** Get primary replicas. */
+ private CompletableFuture<List<PrimaryReplica>> primaryReplicas(int
tableId) {
+ int catalogVersion = catalogManager.latestCatalogVersion();
+
+ Catalog catalog = catalogManager.catalog(catalogVersion);
+
+ CatalogTableDescriptor tblDesc =
Objects.requireNonNull(catalog.table(tableId), "table");
+
+ CatalogZoneDescriptor zoneDesc =
Objects.requireNonNull(catalog.zone(tblDesc.zoneId()), "zone");
+
+ int partitions = zoneDesc.partitions();
+
+ List<CompletableFuture<PrimaryReplica>> result = new
ArrayList<>(partitions);
+
+ HybridTimestamp clockNow = clock.now();
+
+ // no need to wait all partitions after pruning was implemented.
+ for (int partId = 0; partId < partitions; ++partId) {
+ int partitionId = partId;
+ ReplicationGroupId partGroupId = new TablePartitionId(tableId,
partitionId);
+
+ CompletableFuture<ReplicaMeta> f =
placementDriver.awaitPrimaryReplica(
+ partGroupId,
+ clockNow,
+ AWAIT_PRIMARY_REPLICA_TIMEOUT,
+ SECONDS
+ );
+
+ result.add(f.handle((primaryReplica, e) -> {
+ if (e != null) {
+ LOG.error("Failed to retrieve primary replica for
partition {}", e, partitionId);
Review Comment:
the level should be either debug or trace
##########
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java:
##########
@@ -261,7 +285,7 @@ public synchronized void start() {
@Override
public CompletableFuture<ExecutionTarget>
forTable(ExecutionTargetFactory factory, IgniteTable table) {
return tableManager.tableAsync(table.id())
Review Comment:
seems we don't need to go to tableManager anymore
##########
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java:
##########
@@ -317,6 +341,69 @@ public CompletableFuture<ExecutionTarget>
forSystemView(ExecutionTargetFactory f
services.forEach(LifecycleAware::start);
}
+ /** Get primary replicas. */
+ private CompletableFuture<List<PrimaryReplica>> primaryReplicas(int
tableId) {
+ int catalogVersion = catalogManager.latestCatalogVersion();
+
+ Catalog catalog = catalogManager.catalog(catalogVersion);
+
+ CatalogTableDescriptor tblDesc =
Objects.requireNonNull(catalog.table(tableId), "table");
+
+ CatalogZoneDescriptor zoneDesc =
Objects.requireNonNull(catalog.zone(tblDesc.zoneId()), "zone");
+
+ int partitions = zoneDesc.partitions();
+
+ List<CompletableFuture<PrimaryReplica>> result = new
ArrayList<>(partitions);
+
+ HybridTimestamp clockNow = clock.now();
+
+ // no need to wait all partitions after pruning was implemented.
+ for (int partId = 0; partId < partitions; ++partId) {
+ int partitionId = partId;
+ ReplicationGroupId partGroupId = new TablePartitionId(tableId,
partitionId);
+
+ CompletableFuture<ReplicaMeta> f =
placementDriver.awaitPrimaryReplica(
+ partGroupId,
+ clockNow,
+ AWAIT_PRIMARY_REPLICA_TIMEOUT,
+ SECONDS
+ );
+
+ result.add(f.handle((primaryReplica, e) -> {
+ if (e != null) {
+ LOG.error("Failed to retrieve primary replica for
partition {}", e, partitionId);
+
+ throw withCause(IgniteInternalException::new,
REPLICA_UNAVAILABLE_ERR, "Failed to get the primary replica"
+ + " [tablePartitionId=" + partGroupId + ']', e);
+ }
+
+ return primaryReplica;
+ })
+ .thenApply(primaryReplica -> {
+ String holder = primaryReplica.getLeaseholder();
+
+ assert holder != null : "Unable to map query, nothing
holds the lease";
+
+ ClusterNode node =
clusterSrvc.topologyService().getByConsistentId(holder);
+
+ if (node == null) {
+ // additional recovery logic is need to be present
around here.
+ throw new IgniteInternalException(Sql.MAPPING_ERR,
"Unable to map query, node is lost or offline");
+ }
+
+ return new PrimaryReplica(node,
primaryReplica.getStartTime().longValue());
Review Comment:
why do you need `PrimaryReplica`?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]