ygerzhedovich commented on code in PR #1469:
URL: https://github.com/apache/ignite-3/pull/1469#discussion_r1085174737
##########
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/IndexScanNode.java:
##########
@@ -119,282 +91,122 @@ public IndexScanNode(
@Nullable Function<RowT, RowT> rowTransformer,
@Nullable BitSet requiredColumns
) {
- super(ctx);
+ super(ctx, rowFactory, schemaTable, filters, rowTransformer,
requiredColumns);
assert !nullOrEmpty(parts);
-
- assert ctx.transaction() != null || ctx.transactionTime() != null :
"Transaction not initialized.";
+ assert rangeConditions == null || rangeConditions.size() > 0;
this.schemaIndex = schemaIndex;
this.parts = parts;
- this.filters = filters;
- this.rowTransformer = rowTransformer;
this.requiredColumns = requiredColumns;
this.rangeConditions = rangeConditions;
this.comp = comp;
this.factory = rowFactory;
- rangeConditionIterator = rangeConditions == null ? null :
rangeConditions.iterator();
-
- tableRowConverter = row -> schemaTable.toRow(context(), row, factory,
requiredColumns);
-
indexRowSchema =
RowConverter.createIndexRowSchema(schemaIndex.columns(),
schemaTable.descriptor());
}
/** {@inheritDoc} */
@Override
- public void request(int rowsCnt) throws Exception {
- assert rowsCnt > 0 && requested == 0 : "rowsCnt=" + rowsCnt + ",
requested=" + requested;
-
- checkState();
-
- requested = rowsCnt;
-
- if (!inLoop) {
- context().execute(this::push, this::onError);
- }
- }
-
- /** {@inheritDoc} */
- @Override
- public void closeInternal() {
- super.closeInternal();
-
- if (activeSubscription != null) {
- activeSubscription.cancel();
-
- activeSubscription = null;
- }
- }
-
- /** {@inheritDoc} */
- @Override
- protected void rewindInternal() {
- requested = 0;
- waiting = 0;
- rangeConditionsProcessed = false;
-
+ protected Publisher<RowT> scan() {
if (rangeConditions != null) {
- rangeConditionIterator = rangeConditions.iterator();
- }
-
- if (activeSubscription != null) {
- activeSubscription.cancel();
-
- activeSubscription = null;
- }
- }
-
- /** {@inheritDoc} */
- @Override
- public void register(List<Node<RowT>> sources) {
- throw new UnsupportedOperationException();
- }
-
- /** {@inheritDoc} */
- @Override
- protected Downstream<RowT> requestDownstream(int idx) {
- throw new UnsupportedOperationException();
- }
-
- private void push() throws Exception {
- if (isClosed()) {
- return;
- }
-
- checkState();
-
- if (requested > 0 && !inBuff.isEmpty()) {
- inLoop = true;
- try {
- while (requested > 0 && !inBuff.isEmpty()) {
- checkState();
-
- RowT row = inBuff.poll();
-
- if (filters != null && !filters.test(row)) {
- continue;
- }
+ Publisher<RowT>[] conditionPublishers = new
Publisher[rangeConditions.size()];
- if (rowTransformer != null) {
- row = rowTransformer.apply(row);
- }
+ Iterator<RangeCondition<RowT>> it = rangeConditions.iterator();
- requested--;
- downstream().push(row);
- }
- } finally {
- inLoop = false;
- }
- }
-
- if (requested > 0) {
- if (waiting == 0 || activeSubscription == null) {
- requestNextBatch();
- }
- }
-
- if (requested > 0 && waiting == NOT_WAITING) {
- if (inBuff.isEmpty()) {
- requested = 0;
- downstream().end();
- } else {
- context().execute(this::push, this::onError);
- }
- }
- }
-
- private void requestNextBatch() {
- if (waiting == NOT_WAITING) {
- return;
- }
-
- if (waiting == 0) {
- // we must not request rows more than inBufSize
- waiting = inBufSize - inBuff.size();
- }
-
- Subscription subscription = this.activeSubscription;
- if (subscription != null) {
- subscription.request(waiting);
- } else if (!rangeConditionsProcessed) {
- RangeCondition<RowT> cond = null;
-
- if (rangeConditionIterator == null ||
!rangeConditionIterator.hasNext()) {
- rangeConditionsProcessed = true;
- } else {
- cond = rangeConditionIterator.next();
-
- rangeConditionsProcessed = !rangeConditionIterator.hasNext();
+ int i = 0;
+ while (it.hasNext()) {
+ conditionPublishers[i++] = indexPublisher(parts, it.next());
}
- indexPublisher(parts, cond).subscribe(new SubscriberImpl());
+ return SubscriptionUtils.concat(conditionPublishers);
} else {
- waiting = NOT_WAITING;
+ return indexPublisher(parts, null);
}
}
private Publisher<RowT> indexPublisher(int[] parts, @Nullable
RangeCondition<RowT> cond) {
Review Comment:
thanks, fixed
##########
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/IndexScanNode.java:
##########
@@ -119,282 +91,122 @@ public IndexScanNode(
@Nullable Function<RowT, RowT> rowTransformer,
@Nullable BitSet requiredColumns
) {
- super(ctx);
+ super(ctx, rowFactory, schemaTable, filters, rowTransformer,
requiredColumns);
assert !nullOrEmpty(parts);
-
- assert ctx.transaction() != null || ctx.transactionTime() != null :
"Transaction not initialized.";
+ assert rangeConditions == null || rangeConditions.size() > 0;
this.schemaIndex = schemaIndex;
this.parts = parts;
- this.filters = filters;
- this.rowTransformer = rowTransformer;
this.requiredColumns = requiredColumns;
this.rangeConditions = rangeConditions;
this.comp = comp;
this.factory = rowFactory;
- rangeConditionIterator = rangeConditions == null ? null :
rangeConditions.iterator();
-
- tableRowConverter = row -> schemaTable.toRow(context(), row, factory,
requiredColumns);
-
indexRowSchema =
RowConverter.createIndexRowSchema(schemaIndex.columns(),
schemaTable.descriptor());
}
/** {@inheritDoc} */
@Override
- public void request(int rowsCnt) throws Exception {
- assert rowsCnt > 0 && requested == 0 : "rowsCnt=" + rowsCnt + ",
requested=" + requested;
-
- checkState();
-
- requested = rowsCnt;
-
- if (!inLoop) {
- context().execute(this::push, this::onError);
- }
- }
-
- /** {@inheritDoc} */
- @Override
- public void closeInternal() {
- super.closeInternal();
-
- if (activeSubscription != null) {
- activeSubscription.cancel();
-
- activeSubscription = null;
- }
- }
-
- /** {@inheritDoc} */
- @Override
- protected void rewindInternal() {
- requested = 0;
- waiting = 0;
- rangeConditionsProcessed = false;
-
+ protected Publisher<RowT> scan() {
if (rangeConditions != null) {
- rangeConditionIterator = rangeConditions.iterator();
- }
-
- if (activeSubscription != null) {
- activeSubscription.cancel();
-
- activeSubscription = null;
- }
- }
-
- /** {@inheritDoc} */
- @Override
- public void register(List<Node<RowT>> sources) {
- throw new UnsupportedOperationException();
- }
-
- /** {@inheritDoc} */
- @Override
- protected Downstream<RowT> requestDownstream(int idx) {
- throw new UnsupportedOperationException();
- }
-
- private void push() throws Exception {
- if (isClosed()) {
- return;
- }
-
- checkState();
-
- if (requested > 0 && !inBuff.isEmpty()) {
- inLoop = true;
- try {
- while (requested > 0 && !inBuff.isEmpty()) {
- checkState();
-
- RowT row = inBuff.poll();
-
- if (filters != null && !filters.test(row)) {
- continue;
- }
+ Publisher<RowT>[] conditionPublishers = new
Publisher[rangeConditions.size()];
- if (rowTransformer != null) {
- row = rowTransformer.apply(row);
- }
+ Iterator<RangeCondition<RowT>> it = rangeConditions.iterator();
- requested--;
- downstream().push(row);
- }
- } finally {
- inLoop = false;
- }
- }
-
- if (requested > 0) {
- if (waiting == 0 || activeSubscription == null) {
- requestNextBatch();
- }
- }
-
- if (requested > 0 && waiting == NOT_WAITING) {
- if (inBuff.isEmpty()) {
- requested = 0;
- downstream().end();
- } else {
- context().execute(this::push, this::onError);
- }
- }
- }
-
- private void requestNextBatch() {
- if (waiting == NOT_WAITING) {
- return;
- }
-
- if (waiting == 0) {
- // we must not request rows more than inBufSize
- waiting = inBufSize - inBuff.size();
- }
-
- Subscription subscription = this.activeSubscription;
- if (subscription != null) {
- subscription.request(waiting);
- } else if (!rangeConditionsProcessed) {
- RangeCondition<RowT> cond = null;
-
- if (rangeConditionIterator == null ||
!rangeConditionIterator.hasNext()) {
- rangeConditionsProcessed = true;
- } else {
- cond = rangeConditionIterator.next();
-
- rangeConditionsProcessed = !rangeConditionIterator.hasNext();
+ int i = 0;
+ while (it.hasNext()) {
+ conditionPublishers[i++] = indexPublisher(parts, it.next());
}
- indexPublisher(parts, cond).subscribe(new SubscriberImpl());
+ return SubscriptionUtils.concat(conditionPublishers);
Review Comment:
thanks, fixed
##########
modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/AbstractBasicIntegrationTest.java:
##########
@@ -227,8 +227,25 @@ public void tearDown(TestInfo testInfo) throws Exception {
tearDownBase(testInfo);
}
+ /**
+ * Start execute query and check all passed to the builder asserts.
Review Comment:
reworked
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]