AMashenkov commented on code in PR #843:
URL: https://github.com/apache/ignite-3/pull/843#discussion_r892077003
##########
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java:
##########
@@ -245,36 +258,304 @@ public CompletableFuture<AsyncSqlCursor<List<Object>>>
querySingleAsync(QueryCon
}
}
+ @Override
+ public CompletableFuture<long[]> batchAsync(QueryContext context, String
schemaName, String qry, List<List<Object>> batchArgs) {
+ if (!busyLock.enterBusy()) {
+ throw new IgniteInternalException(new NodeStoppingException());
+ }
+
+ try {
+ QueryProperties props =
context.maybeUnwrap(QueryProperties.class).orElse(QueryProperties.EMPTY_PROPERTIES);
+
+ if (props.booleanValue(SessionProperties.BATCH_PARALLEL, false)) {
+ return batchParallel(context, schemaName, qry, batchArgs);
+ } else {
+ return batchSerial(context, schemaName, qry, batchArgs);
+ }
+ } finally {
+ busyLock.leaveBusy();
+ }
+ }
+
private CompletableFuture<AsyncSqlCursor<List<Object>>> querySingle0(
+ final QueryContext context,
+ final String schemaName,
+ final String sql,
+ final Object... params
+ ) {
+ try {
+ BaseQueryContext ctx = baseContext(schemaName, params);
+
+ CompletableFuture<Void> start = new CompletableFuture<>();
+
+ CompletableFuture<QueryPlan> planning = planningSingle(
+ start,
+ ctx,
+ context,
+ schemaName,
+ sql
+ );
+
+ CompletableFuture<AsyncSqlCursor<List<Object>>> stage =
planning.thenApply(plan -> {
+ context.maybeUnwrap(QueryValidator.class)
+ .ifPresent(queryValidator ->
queryValidator.validatePlan(plan));
+
+ return new AsyncSqlCursorImpl<>(
+ SqlQueryType.mapPlanTypeToSqlType(plan.type()),
+ plan.metadata(),
+ executionSrvc.executePlan(plan, ctx)
+ );
+ });
+
+ stage.whenComplete((cur, ex) -> {
+ if (ex instanceof CancellationException) {
+ ctx.cancel().cancel();
+ }
+ });
+
+ start.completeAsync(() -> null, taskExecutor);
+
+ return stage;
+ } catch (Exception ex) {
+ return CompletableFuture.failedFuture(ex);
+ }
+ }
+
+ private CompletableFuture<long[]> batchSerial(
QueryContext context,
String schemaName,
String sql,
- Object... params) {
+ List<List<Object>> batchArgs
+ ) {
+ if (CollectionUtils.nullOrEmpty(batchArgs)) {
+ return CompletableFuture.failedFuture(new IgniteException("Batch
is empty"));
+ }
+
+ try {
+ CompletableFuture<Void> start = new CompletableFuture<>();
+ SchemaPlus schema = schemaManager.schema(schemaName);
+
+ BaseQueryContext ctx = baseContext(schemaName,
batchArgs.get(0).toArray());
+
+ CompletableFuture<QueryPlan> planning = planningSingle(
+ start,
+ ctx,
+ context,
+ schemaName,
+ sql
+ );
+
+ final var counters = new LongArrayList(batchArgs.size());
+ CompletableFuture<Void> tail =
CompletableFuture.completedFuture(null);
+ ArrayList<CompletableFuture<Void>> batchFuts = new
ArrayList<>(batchArgs.size());
+
+ for (int i = 0; i < batchArgs.size(); ++i) {
+ List<Object> batch = batchArgs.get(i);
+
+ final BaseQueryContext ctx0 = BaseQueryContext.builder()
+ .cancel(new QueryCancel())
+ .frameworkConfig(
+ Frameworks.newConfigBuilder(FRAMEWORK_CONFIG)
+ .defaultSchema(schema)
+ .build()
+ )
+ .logger(LOG)
+ .parameters(batch.toArray())
+ .build();
+
+ tail = tail.thenCombine(planning, (updCntr, plan) -> new
AsyncSqlCursorImpl<>(
+ SqlQueryType.mapPlanTypeToSqlType(plan.type()),
+ plan.metadata(),
+ executionSrvc.executePlan(plan, ctx0)
+ ))
+ .thenCompose(cur -> cur.requestNextAsync(1))
+ .thenAccept(page -> {
+ if (page == null
+ || page.items() == null
+ || page.items().size() != 1
+ || page.items().get(0).size() != 1
+ || page.hasMore()) {
+ throw new SqlException("Invalid DML results: "
+ page);
+ }
Review Comment:
Let's extract this to a separate method 'validateDmlResult'
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]