This is an automated email from the ASF dual-hosted git repository. mhubail pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/asterixdb.git
The following commit(s) were added to refs/heads/master by this push: new 066fd5657f [ASTERIXDB-3170][COMP] Support COPY Statment 066fd5657f is described below commit 066fd5657f2a2a89de54c47b6d62facc858175ed Author: Peeyush Gupta <peeyush.gu...@couchbase.com> AuthorDate: Sun Apr 30 13:03:00 2023 -0700 [ASTERIXDB-3170][COMP] Support COPY Statment - user model changes: yes - storage format changes: no - interface changes: no Details: With this change, we add support for COPY statement, that can be used to upsert data into a dataset from an external data source. Change-Id: I612978472f090ab3c32e901aa37087ed5b7edf92 Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17499 Integration-Tests: Jenkins <jenk...@fulliautomatix.ics.uci.edu> Tested-by: Jenkins <jenk...@fulliautomatix.ics.uci.edu> Reviewed-by: Peeyush Gupta <peeyush.gu...@couchbase.com> Reviewed-by: Murtadha Hubail <mhub...@apache.org> --- .../base/ILangExpressionToPlanTranslator.java | 4 +- .../asterix/translator/CompiledStatements.java | 44 ++++++++++ .../translator/LangExpressionToPlanTranslator.java | 94 +++++++++++++++------- .../apache/asterix/api/common/APIFramework.java | 13 +-- .../asterix/app/translator/QueryTranslator.java | 59 ++++++++++++++ .../queries_sqlpp/copy/copy-1/copy-1.1.ddl.sqlpp | 44 ++++++++++ .../copy/copy-1/copy-1.2.update.sqlpp | 65 +++++++++++++++ .../queries_sqlpp/copy/copy-1/copy-1.3.query.sqlpp | 23 ++++++ .../copy/copy-2/copy-2.0.container.sqlpp | 20 +++++ .../queries_sqlpp/copy/copy-2/copy-2.1.ddl.sqlpp | 25 ++++++ .../copy/copy-2/copy-2.2.update.sqlpp | 52 ++++++++++++ .../queries_sqlpp/copy/copy-2/copy-2.3.query.sqlpp | 23 ++++++ .../runtimets/results/copy/copy-1/copy-1.1.adm | 1 + .../runtimets/results/copy/copy-2/copy-2.1.adm | 1 + .../runtimets/testsuite_external_dataset_s3.xml | 7 ++ .../test/resources/runtimets/testsuite_sqlpp.xml | 7 ++ .../apache/asterix/lang/common/base/Statement.java | 1 + .../lang/common/statement/CopyStatement.java | 90 +++++++++++++++++++++ .../lang/common/visitor/FormatPrintVisitor.java | 11 +++ .../base/AbstractQueryExpressionVisitor.java | 6 ++ .../lang/common/visitor/base/ILangVisitor.java | 3 + .../asterix-lang-sqlpp/src/main/javacc/SQLPP.jj | 34 +++++++- 22 files changed, 591 insertions(+), 36 deletions(-) diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/base/ILangExpressionToPlanTranslator.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/base/ILangExpressionToPlanTranslator.java index fea93400f1..0116576cde 100644 --- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/base/ILangExpressionToPlanTranslator.java +++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/base/ILangExpressionToPlanTranslator.java @@ -51,10 +51,10 @@ public interface ILangExpressionToPlanTranslator { * * @param stmt, * the compiled load statement. - * @return a logical query plan for the load statement. + * @return a logical query plan for the Copy/Load statement. * @throws AlgebricksException */ - public ILogicalPlan translateLoad(ICompiledDmlStatement stmt) throws AlgebricksException; + public ILogicalPlan translateCopyOrLoad(ICompiledDmlStatement stmt) throws AlgebricksException; /** * @return the current minimum available variable id. diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/CompiledStatements.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/CompiledStatements.java index 088676c520..4981f0ef04 100644 --- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/CompiledStatements.java +++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/CompiledStatements.java @@ -286,6 +286,50 @@ public class CompiledStatements { } } + public static class CompiledCopyFromFileStatement extends AbstractCompiledStatement + implements ICompiledDmlStatement { + private final DataverseName dataverseName; + private final String datasetName; + private final String adapter; + private final Map<String, String> properties; + + public CompiledCopyFromFileStatement(DataverseName dataverseName, String datasetName, String adapter, + Map<String, String> properties) { + this.dataverseName = dataverseName; + this.datasetName = datasetName; + this.adapter = adapter; + this.properties = properties; + } + + @Override + public DataverseName getDataverseName() { + return dataverseName; + } + + @Override + public String getDatasetName() { + return datasetName; + } + + public String getAdapter() { + return adapter; + } + + public Map<String, String> getProperties() { + return properties; + } + + @Override + public Statement.Kind getKind() { + return Statement.Kind.COPY; + } + + @Override + public byte getCategory() { + return Statement.Category.UPDATE; + } + } + public static class CompiledInsertStatement extends AbstractCompiledStatement implements ICompiledDmlStatement { private final DataverseName dataverseName; private final String datasetName; diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/LangExpressionToPlanTranslator.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/LangExpressionToPlanTranslator.java index bd06729885..52cddc50f5 100644 --- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/LangExpressionToPlanTranslator.java +++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/LangExpressionToPlanTranslator.java @@ -44,6 +44,7 @@ import org.apache.asterix.common.metadata.DataverseName; import org.apache.asterix.lang.common.base.Expression; import org.apache.asterix.lang.common.base.Expression.Kind; import org.apache.asterix.lang.common.base.ILangExpression; +import org.apache.asterix.lang.common.base.Statement; import org.apache.asterix.lang.common.clause.GroupbyClause; import org.apache.asterix.lang.common.clause.LetClause; import org.apache.asterix.lang.common.clause.LimitClause; @@ -99,6 +100,7 @@ import org.apache.asterix.om.functions.BuiltinFunctions; import org.apache.asterix.om.types.ARecordType; import org.apache.asterix.om.types.BuiltinType; import org.apache.asterix.om.types.IAType; +import org.apache.asterix.translator.CompiledStatements.CompiledCopyFromFileStatement; import org.apache.asterix.translator.CompiledStatements.CompiledInsertStatement; import org.apache.asterix.translator.CompiledStatements.CompiledLoadFromFileStatement; import org.apache.asterix.translator.CompiledStatements.CompiledUpsertStatement; @@ -194,15 +196,13 @@ abstract class LangExpressionToPlanTranslator return context.getVarCounter(); } - @Override - public ILogicalPlan translateLoad(ICompiledDmlStatement stmt) throws AlgebricksException { - CompiledLoadFromFileStatement clffs = (CompiledLoadFromFileStatement) stmt; + public ILogicalPlan translateCopyOrLoad(ICompiledDmlStatement stmt) throws AlgebricksException { SourceLocation sourceLoc = stmt.getSourceLocation(); - Dataset dataset = metadataProvider.findDataset(clffs.getDataverseName(), clffs.getDatasetName()); + Dataset dataset = metadataProvider.findDataset(stmt.getDataverseName(), stmt.getDatasetName()); if (dataset == null) { // This would never happen since we check for this in AqlTranslator - throw new CompilationException(ErrorCode.UNKNOWN_DATASET_IN_DATAVERSE, sourceLoc, clffs.getDatasetName(), - clffs.getDataverseName()); + throw new CompilationException(ErrorCode.UNKNOWN_DATASET_IN_DATAVERSE, sourceLoc, stmt.getDatasetName(), + stmt.getDataverseName()); } IAType itemType = metadataProvider.findType(dataset.getItemTypeDataverseName(), dataset.getItemTypeName()); IAType metaItemType = @@ -219,7 +219,18 @@ abstract class LangExpressionToPlanTranslator LoadableDataSource lds; try { - lds = new LoadableDataSource(dataset, itemType, metaItemType, clffs.getAdapter(), clffs.getProperties()); + if (stmt.getKind() == Statement.Kind.LOAD) { + lds = new LoadableDataSource(dataset, itemType, metaItemType, + ((CompiledLoadFromFileStatement) stmt).getAdapter(), + ((CompiledLoadFromFileStatement) stmt).getProperties()); + } else if (stmt.getKind() == Statement.Kind.COPY) { + lds = new LoadableDataSource(dataset, itemType, metaItemType, + ((CompiledCopyFromFileStatement) stmt).getAdapter(), + ((CompiledCopyFromFileStatement) stmt).getProperties()); + } else { + throw new CompilationException(ErrorCode.COMPILATION_ERROR, sourceLoc, "Unrecognized Statement Type", + stmt.getKind()); + } } catch (IOException e) { throw new CompilationException(ErrorCode.COMPILATION_ERROR, sourceLoc, e.toString(), e); } @@ -258,14 +269,16 @@ abstract class LangExpressionToPlanTranslator assign.getInputs().add(new MutableObject<>(dssOp)); assign.setSourceLocation(sourceLoc); - // If the input is pre-sorted, we set the ordering property explicitly in the - // assign - if (clffs.alreadySorted()) { - List<OrderColumn> orderColumns = new ArrayList<>(); - for (int i = 0; i < pkVars.size(); ++i) { - orderColumns.add(new OrderColumn(pkVars.get(i), OrderKind.ASC)); + if (stmt.getKind() == Statement.Kind.LOAD) { + // If the input is pre-sorted, we set the ordering property explicitly in the + // assign + if (((CompiledLoadFromFileStatement) stmt).alreadySorted()) { + List<OrderColumn> orderColumns = new ArrayList<>(); + for (int i = 0; i < pkVars.size(); ++i) { + orderColumns.add(new OrderColumn(pkVars.get(i), OrderKind.ASC)); + } + assign.setExplicitOrderingProperty(new LocalOrderProperty(orderColumns)); } - assign.setExplicitOrderingProperty(new LocalOrderProperty(orderColumns)); } // Load does not support meta record now. @@ -285,22 +298,49 @@ abstract class LangExpressionToPlanTranslator additionalFilteringAssign.setSourceLocation(sourceLoc); } - InsertDeleteUpsertOperator insertOp = new InsertDeleteUpsertOperator(targetDatasource, payloadRef, - varRefsForLoading, InsertDeleteUpsertOperator.Kind.INSERT, true); - insertOp.setAdditionalFilteringExpressions(additionalFilteringExpressions); - insertOp.setSourceLocation(sourceLoc); + if (stmt.getKind() == Statement.Kind.LOAD) { + InsertDeleteUpsertOperator insertOp = new InsertDeleteUpsertOperator(targetDatasource, payloadRef, + varRefsForLoading, InsertDeleteUpsertOperator.Kind.INSERT, true); + insertOp.setAdditionalFilteringExpressions(additionalFilteringExpressions); + insertOp.setSourceLocation(sourceLoc); + + if (additionalFilteringAssign != null) { + additionalFilteringAssign.getInputs().add(new MutableObject<>(assign)); + insertOp.getInputs().add(new MutableObject<>(additionalFilteringAssign)); + } else { + insertOp.getInputs().add(new MutableObject<>(assign)); + } + + SinkOperator leafOperator = new SinkOperator(); + leafOperator.getInputs().add(new MutableObject<>(insertOp)); + leafOperator.setSourceLocation(sourceLoc); + return new ALogicalPlanImpl(new MutableObject<>(leafOperator)); + } else if (stmt.getKind() == Statement.Kind.COPY) { + InsertDeleteUpsertOperator upsertOp = new InsertDeleteUpsertOperator(targetDatasource, payloadRef, + varRefsForLoading, InsertDeleteUpsertOperator.Kind.UPSERT, false); + upsertOp.setAdditionalFilteringExpressions(additionalFilteringExpressions); + upsertOp.setSourceLocation(sourceLoc); - if (additionalFilteringAssign != null) { - additionalFilteringAssign.getInputs().add(new MutableObject<>(assign)); - insertOp.getInputs().add(new MutableObject<>(additionalFilteringAssign)); + if (additionalFilteringAssign != null) { + additionalFilteringAssign.getInputs().add(new MutableObject<>(assign)); + upsertOp.getInputs().add(new MutableObject<>(additionalFilteringAssign)); + } else { + upsertOp.getInputs().add(new MutableObject<>(assign)); + } + upsertOp.setOperationVar(context.newVar()); + upsertOp.setOperationVarType(BuiltinType.AINT8); + // Create and add a new variable used for representing the original record + upsertOp.setPrevRecordVar(context.newVar()); + upsertOp.setPrevRecordType(itemType); + + DelegateOperator delegateOperator = new DelegateOperator(new CommitOperator(true)); + delegateOperator.getInputs().add(new MutableObject<>(upsertOp)); + delegateOperator.setSourceLocation(sourceLoc); + return new ALogicalPlanImpl(new MutableObject<>(delegateOperator)); } else { - insertOp.getInputs().add(new MutableObject<>(assign)); + throw new CompilationException(ErrorCode.COMPILATION_ERROR, sourceLoc, "Unrecognized Statement Type", + stmt.getKind()); } - - SinkOperator leafOperator = new SinkOperator(); - leafOperator.getInputs().add(new MutableObject<>(insertOp)); - leafOperator.setSourceLocation(sourceLoc); - return new ALogicalPlanImpl(new MutableObject<>(leafOperator)); } @Override diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java index 41be44b153..afdcee40dc 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java @@ -192,6 +192,7 @@ public class APIFramework { // establish facts final boolean isQuery = query != null; final boolean isLoad = statement != null && statement.getKind() == Statement.Kind.LOAD; + final boolean isCopy = statement != null && statement.getKind() == Statement.Kind.COPY; final SourceLocation sourceLoc = query != null ? query.getSourceLocation() : statement != null ? statement.getSourceLocation() : null; final boolean isExplainOnly = isQuery && query.isExplain(); @@ -207,8 +208,8 @@ public class APIFramework { ILangExpressionToPlanTranslator t = translatorFactory.createExpressionToPlanTranslator(metadataProvider, varCounter, externalVars); ResultMetadata resultMetadata = new ResultMetadata(output.config().fmt()); - ILogicalPlan plan = - isLoad ? t.translateLoad(statement) : t.translate(query, outputDatasetName, statement, resultMetadata); + ILogicalPlan plan = isLoad || isCopy ? t.translateCopyOrLoad(statement) + : t.translate(query, outputDatasetName, statement, resultMetadata); ICcApplicationContext ccAppContext = metadataProvider.getApplicationContext(); CompilerProperties compilerProperties = ccAppContext.getCompilerProperties(); @@ -233,7 +234,7 @@ public class APIFramework { builder.setWarningCollector(warningCollector); builder.setMaxWarnings(conf.getMaxWarnings()); - if ((isQuery || isLoad) && !conf.is(SessionConfig.FORMAT_ONLY_PHYSICAL_OPS) + if ((isQuery || isLoad || isCopy) && !conf.is(SessionConfig.FORMAT_ONLY_PHYSICAL_OPS) && conf.is(SessionConfig.OOB_LOGICAL_PLAN)) { generateLogicalPlan(plan, output.config().getPlanFormat(), cboMode); } @@ -272,7 +273,7 @@ public class APIFramework { PlanPrettyPrinter.printPhysicalOps(plan, buf, 0, true); output.out().write(buf.toString()); } else { - if (isQuery || isLoad) { + if (isQuery || isLoad || isCopy) { generateOptimizedLogicalPlan(plan, output.config().getPlanFormat(), cboMode); } } @@ -299,7 +300,7 @@ public class APIFramework { } if (!conf.isGenerateJobSpec()) { - if (isQuery || isLoad) { + if (isQuery || isLoad || isCopy) { generateOptimizedLogicalPlan(plan, output.config().getPlanFormat(), cboMode); } return null; @@ -324,7 +325,7 @@ public class APIFramework { } if (conf.is(SessionConfig.OOB_OPTIMIZED_LOGICAL_PLAN) || isExplainOnly) { - if (isQuery || isLoad) { + if (isQuery || isLoad || isCopy) { generateOptimizedLogicalPlan(plan, spec.getLogical2PhysicalMap(), output.config().getPlanFormat(), cboMode); } diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java index f48e2b8bc4..43218d0552 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java @@ -112,6 +112,7 @@ import org.apache.asterix.lang.common.statement.AnalyzeDropStatement; import org.apache.asterix.lang.common.statement.AnalyzeStatement; import org.apache.asterix.lang.common.statement.CompactStatement; import org.apache.asterix.lang.common.statement.ConnectFeedStatement; +import org.apache.asterix.lang.common.statement.CopyStatement; import org.apache.asterix.lang.common.statement.CreateAdapterStatement; import org.apache.asterix.lang.common.statement.CreateDataverseStatement; import org.apache.asterix.lang.common.statement.CreateFeedPolicyStatement; @@ -209,6 +210,7 @@ import org.apache.asterix.runtime.operators.DatasetStreamStats; import org.apache.asterix.transaction.management.service.transaction.DatasetIdFactory; import org.apache.asterix.translator.AbstractLangTranslator; import org.apache.asterix.translator.ClientRequest; +import org.apache.asterix.translator.CompiledStatements.CompiledCopyFromFileStatement; import org.apache.asterix.translator.CompiledStatements.CompiledDeleteStatement; import org.apache.asterix.translator.CompiledStatements.CompiledInsertStatement; import org.apache.asterix.translator.CompiledStatements.CompiledLoadFromFileStatement; @@ -436,6 +438,12 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen } handleLoadStatement(metadataProvider, stmt, hcc); break; + case COPY: + if (stats.getProfileType() == Stats.ProfileType.FULL) { + this.jobFlags.add(JobFlag.PROFILE_RUNTIME); + } + handleCopyStatement(metadataProvider, stmt, hcc); + break; case INSERT: case UPSERT: if (((InsertStatement) stmt).getReturnExpression() != null) { @@ -3442,6 +3450,57 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen } } + protected Map<String, String> createExternalDataPropertiesForCopyStmt(DataverseName dataverseName, + CopyStatement copyStatement, Datatype itemType, MetadataTransactionContext mdTxnCtx) + throws AlgebricksException { + return copyStatement.getExternalDetails().getProperties(); + } + + protected void handleCopyStatement(MetadataProvider metadataProvider, Statement stmt, IHyracksClientConnection hcc) + throws Exception { + CopyStatement copyStmt = (CopyStatement) stmt; + String datasetName = copyStmt.getDatasetName(); + metadataProvider.validateDatabaseObjectName(copyStmt.getDataverseName(), datasetName, + copyStmt.getSourceLocation()); + DataverseName dataverseName = getActiveDataverseName(copyStmt.getDataverseName()); + MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction(); + boolean bActiveTxn = true; + metadataProvider.setMetadataTxnContext(mdTxnCtx); + lockUtil.insertDeleteUpsertBegin(lockManager, metadataProvider.getLocks(), dataverseName, datasetName); + try { + metadataProvider.setWriteTransaction(true); + Dataset dataset = metadataProvider.findDataset(dataverseName, copyStmt.getDatasetName()); + Datatype itemType = MetadataManager.INSTANCE.getDatatype(mdTxnCtx, dataset.getItemTypeDataverseName(), + dataset.getItemTypeName()); + + ExternalDetailsDecl externalDetails = copyStmt.getExternalDetails(); + Map<String, String> properties = + createExternalDataPropertiesForCopyStmt(dataverseName, copyStmt, itemType, mdTxnCtx); + ExternalDataUtils.normalize(properties); + ExternalDataUtils.validate(properties); + validateExternalDatasetProperties(externalDetails, properties, copyStmt.getSourceLocation(), mdTxnCtx, + appCtx); + CompiledCopyFromFileStatement cls = new CompiledCopyFromFileStatement(dataverseName, + copyStmt.getDatasetName(), externalDetails.getAdapter(), properties); + cls.setSourceLocation(stmt.getSourceLocation()); + JobSpecification spec = apiFramework.compileQuery(hcc, metadataProvider, null, 0, null, sessionOutput, cls, + null, responsePrinter, warningCollector, null); + afterCompile(); + MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); + bActiveTxn = false; + if (spec != null && !isCompileOnly()) { + runJob(hcc, spec); + } + } catch (Exception e) { + if (bActiveTxn) { + abort(e, e, mdTxnCtx); + } + throw e; + } finally { + metadataProvider.getLocks().unlock(); + } + } + public JobSpecification handleInsertUpsertStatement(MetadataProvider metadataProvider, Statement stmt, IHyracksClientConnection hcc, IResultSet resultSet, ResultDelivery resultDelivery, ResultMetadata outMetadata, Stats stats, IRequestParameters requestParameters, diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy/copy-1/copy-1.1.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy/copy-1/copy-1.1.ddl.sqlpp new file mode 100644 index 0000000000..e69b33bd8c --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy/copy-1/copy-1.1.ddl.sqlpp @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + + create dataverse test if not exists; + use test; + +create type AddressType as open { + number: int64, + street: string, + city: string +}; + +create type CustomerType as closed { + cid: int64, + name: string, + cashBack: int64, + age: int64?, + address: AddressType?, + lastorder: { + oid: int64, + total: float + } +}; + +create dataset Customers(CustomerType) primary key cid; + + + diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy/copy-1/copy-1.2.update.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy/copy-1/copy-1.2.update.sqlpp new file mode 100644 index 0000000000..002d43d950 --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy/copy-1/copy-1.2.update.sqlpp @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +use test; + +upsert into Customers([ + { + "cid": 1, + "name": "Jodi Rotruck", + "cashBack": 100, + "lastorder": { "oid": 66, "total": 38.618626f } + }, + { + "cid": 1000, + "name": "ABC", + "cashBack": 100, + "lastorder": { "oid": 66, "total": 38.618626f } + } +]); + +copy Customers +using localfs +(("path"="asterix_nc1://data/nontagged/customerData.json"),("format"="adm")); + +upsert into Customers([ + { + "cid": 1, + "name": "Jodi Rotruck", + "cashBack": 100, + "lastorder": { "oid": 66, "total": 38.618626f } + }, + { + "cid": 1000, + "name": "Jodi Rotruck", + "cashBack": 100, + "lastorder": { "oid": 66, "total": 38.618626f } + }, + { + "cid": 4, + "name": "ABC", + "cashBack": 100, + "lastorder": { "oid": 66, "total": 38.618626f } + }, + { + "cid": 1001, + "name": "XYZ", + "cashBack": 100, + "lastorder": { "oid": 66, "total": 38.618626f } + } +]); diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy/copy-1/copy-1.3.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy/copy-1/copy-1.3.query.sqlpp new file mode 100644 index 0000000000..e5a896beec --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy/copy-1/copy-1.3.query.sqlpp @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + + use test; + + select value count(*) from + Customers; diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy/copy-2/copy-2.0.container.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy/copy-2/copy-2.0.container.sqlpp new file mode 100644 index 0000000000..ecbbe43a76 --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy/copy-2/copy-2.0.container.sqlpp @@ -0,0 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +// create container with data +playground data_dir data/json/single-line/20-records.json \ No newline at end of file diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy/copy-2/copy-2.1.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy/copy-2/copy-2.1.ddl.sqlpp new file mode 100644 index 0000000000..d44645c82e --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy/copy-2/copy-2.1.ddl.sqlpp @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +drop dataverse test if exists; +create dataverse test; +use test; + +drop dataset test1 if exists; +create dataset test1 primary key (id: int); \ No newline at end of file diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy/copy-2/copy-2.2.update.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy/copy-2/copy-2.2.update.sqlpp new file mode 100644 index 0000000000..a974a62522 --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy/copy-2/copy-2.2.update.sqlpp @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +use test; + +upsert into test1([ + { + "id": 1 + }, + { + "id": 1000 + } +]); + +copy test1 USING S3 ( +("region"="us-west-2"), +("serviceEndpoint"="http://localhost:8001"), +("container"="playground"), +("definition"="data_dir"), +("format"="json") +); + +upsert into test1([ + { + "id": 1 + }, + { + "id": 1000 + }, + { + "id": 2 + }, + { + "id": 1001 + } +]); \ No newline at end of file diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy/copy-2/copy-2.3.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy/copy-2/copy-2.3.query.sqlpp new file mode 100644 index 0000000000..b72e741501 --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy/copy-2/copy-2.3.query.sqlpp @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +use test; + +select count(*) `count` from test1; + diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/copy/copy-1/copy-1.1.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/copy/copy-1/copy-1.1.adm new file mode 100644 index 0000000000..c7930257df --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/copy/copy-1/copy-1.1.adm @@ -0,0 +1 @@ +7 \ No newline at end of file diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/copy/copy-2/copy-2.1.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/copy/copy-2/copy-2.1.adm new file mode 100644 index 0000000000..e5bcb837c9 --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/copy/copy-2/copy-2.1.adm @@ -0,0 +1 @@ +{ "count": 22 } \ No newline at end of file diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_s3.xml b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_s3.xml index 724298479e..8db17984f1 100644 --- a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_s3.xml +++ b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_s3.xml @@ -479,4 +479,11 @@ </compilation-unit> </test-case> </test-group> + <test-group name="copy"> + <test-case FilePath="copy"> + <compilation-unit name="copy-2"> + <output-dir compare="Text">copy-2</output-dir> + </compilation-unit> + </test-case> + </test-group> </test-suite> diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml index 2792f91a9c..7bcb4732f6 100644 --- a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml +++ b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml @@ -16274,4 +16274,11 @@ </compilation-unit> </test-case> </test-group> + <test-group name="copy"> + <test-case FilePath="copy"> + <compilation-unit name="copy-1"> + <output-dir compare="Text">copy-1</output-dir> + </compilation-unit> + </test-case> + </test-group> </test-suite> diff --git a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/base/Statement.java b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/base/Statement.java index 05d53b17ce..1654118f33 100644 --- a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/base/Statement.java +++ b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/base/Statement.java @@ -111,5 +111,6 @@ public interface Statement extends ILangExpression { COMPACT, SUBSCRIBE_FEED, EXTENSION, + COPY } } diff --git a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/CopyStatement.java b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/CopyStatement.java new file mode 100644 index 0000000000..3baf81ab97 --- /dev/null +++ b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/CopyStatement.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.lang.common.statement; + +import org.apache.asterix.common.exceptions.CompilationException; +import org.apache.asterix.common.metadata.DataverseName; +import org.apache.asterix.lang.common.base.AbstractStatement; +import org.apache.asterix.lang.common.expression.RecordConstructor; +import org.apache.asterix.lang.common.util.ExpressionUtils; +import org.apache.asterix.lang.common.visitor.base.ILangVisitor; +import org.apache.asterix.object.base.AdmObjectNode; + +public class CopyStatement extends AbstractStatement { + + private DataverseName dataverseName; + private String datasetName; + private ExternalDetailsDecl externalDetails; + private AdmObjectNode withObjectNode; + + public CopyStatement(DataverseName dataverseName, String datasetName, ExternalDetailsDecl externalDetails, + RecordConstructor withRecord) throws CompilationException { + this.dataverseName = dataverseName; + this.datasetName = datasetName; + this.externalDetails = externalDetails; + this.withObjectNode = withRecord == null ? new AdmObjectNode() : ExpressionUtils.toNode(withRecord); + } + + public DataverseName getDataverseName() { + return dataverseName; + } + + public void setDataverseName(DataverseName dataverseName) { + this.dataverseName = dataverseName; + } + + @Override + public Kind getKind() { + return Kind.COPY; + } + + public String getDatasetName() { + return datasetName; + } + + public void setDatasetName(String datasetName) { + this.datasetName = datasetName; + } + + @Override + public <R, T> R accept(ILangVisitor<R, T> visitor, T arg) throws CompilationException { + return visitor.visit(this, arg); + } + + public ExternalDetailsDecl getExternalDetails() { + return externalDetails; + } + + public void setExternalDetails(ExternalDetailsDecl externalDetails) { + this.externalDetails = externalDetails; + } + + public AdmObjectNode getWithObjectNode() { + return withObjectNode; + } + + public void setWithObjectNode(AdmObjectNode withObjectNode) { + this.withObjectNode = withObjectNode; + } + + @Override + public byte getCategory() { + return Category.UPDATE; + } +} diff --git a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/visitor/FormatPrintVisitor.java b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/visitor/FormatPrintVisitor.java index 74243989fb..23455f10f5 100644 --- a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/visitor/FormatPrintVisitor.java +++ b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/visitor/FormatPrintVisitor.java @@ -69,6 +69,7 @@ import org.apache.asterix.lang.common.statement.AnalyzeDropStatement; import org.apache.asterix.lang.common.statement.AnalyzeStatement; import org.apache.asterix.lang.common.statement.CompactStatement; import org.apache.asterix.lang.common.statement.ConnectFeedStatement; +import org.apache.asterix.lang.common.statement.CopyStatement; import org.apache.asterix.lang.common.statement.CreateAdapterStatement; import org.apache.asterix.lang.common.statement.CreateDataverseStatement; import org.apache.asterix.lang.common.statement.CreateFeedPolicyStatement; @@ -544,6 +545,16 @@ public abstract class FormatPrintVisitor implements ILangVisitor<Void, Integer> return null; } + @Override + public Void visit(CopyStatement stmtCopy, Integer step) throws CompilationException { + out.print(skip(step) + "copy " + datasetSymbol + + generateFullName(stmtCopy.getDataverseName(), stmtCopy.getDatasetName()) + " using " + + revertStringToQuoted(stmtCopy.getExternalDetails().getAdapter()) + " "); + printConfiguration(stmtCopy.getExternalDetails().getProperties()); + out.println(); + return null; + } + @Override public Void visit(DropDatasetStatement del, Integer step) throws CompilationException { out.println( diff --git a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/visitor/base/AbstractQueryExpressionVisitor.java b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/visitor/base/AbstractQueryExpressionVisitor.java index a7444e9187..accf47648f 100644 --- a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/visitor/base/AbstractQueryExpressionVisitor.java +++ b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/visitor/base/AbstractQueryExpressionVisitor.java @@ -29,6 +29,7 @@ import org.apache.asterix.lang.common.statement.AnalyzeDropStatement; import org.apache.asterix.lang.common.statement.AnalyzeStatement; import org.apache.asterix.lang.common.statement.CompactStatement; import org.apache.asterix.lang.common.statement.ConnectFeedStatement; +import org.apache.asterix.lang.common.statement.CopyStatement; import org.apache.asterix.lang.common.statement.CreateAdapterStatement; import org.apache.asterix.lang.common.statement.CreateDataverseStatement; import org.apache.asterix.lang.common.statement.CreateFeedPolicyStatement; @@ -116,6 +117,11 @@ public abstract class AbstractQueryExpressionVisitor<R, T> implements ILangVisit return null; } + @Override + public R visit(CopyStatement stmtCopy, T arg) throws CompilationException { + return null; + } + @Override public R visit(NodegroupDecl ngd, T arg) throws CompilationException { return null; diff --git a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/visitor/base/ILangVisitor.java b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/visitor/base/ILangVisitor.java index 541567dddd..9ad247cb64 100644 --- a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/visitor/base/ILangVisitor.java +++ b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/visitor/base/ILangVisitor.java @@ -47,6 +47,7 @@ import org.apache.asterix.lang.common.statement.AnalyzeDropStatement; import org.apache.asterix.lang.common.statement.AnalyzeStatement; import org.apache.asterix.lang.common.statement.CompactStatement; import org.apache.asterix.lang.common.statement.ConnectFeedStatement; +import org.apache.asterix.lang.common.statement.CopyStatement; import org.apache.asterix.lang.common.statement.CreateAdapterStatement; import org.apache.asterix.lang.common.statement.CreateDataverseStatement; import org.apache.asterix.lang.common.statement.CreateFeedPolicyStatement; @@ -101,6 +102,8 @@ public interface ILangVisitor<R, T> { R visit(LoadStatement stmtLoad, T arg) throws CompilationException; + R visit(CopyStatement stmtCopy, T arg) throws CompilationException; + R visit(DropDatasetStatement del, T arg) throws CompilationException; R visit(InsertStatement insert, T arg) throws CompilationException; diff --git a/asterixdb/asterix-lang-sqlpp/src/main/javacc/SQLPP.jj b/asterixdb/asterix-lang-sqlpp/src/main/javacc/SQLPP.jj index 007e9afa08..01f72aa3cc 100644 --- a/asterixdb/asterix-lang-sqlpp/src/main/javacc/SQLPP.jj +++ b/asterixdb/asterix-lang-sqlpp/src/main/javacc/SQLPP.jj @@ -159,6 +159,7 @@ import org.apache.asterix.lang.common.statement.FullTextConfigDropStatement; import org.apache.asterix.lang.common.statement.InsertStatement; import org.apache.asterix.lang.common.statement.InternalDetailsDecl; import org.apache.asterix.lang.common.statement.LoadStatement; +import org.apache.asterix.lang.common.statement.CopyStatement; import org.apache.asterix.lang.common.statement.NodeGroupDropStatement; import org.apache.asterix.lang.common.statement.NodegroupDecl; import org.apache.asterix.lang.common.statement.Query; @@ -929,6 +930,7 @@ Statement SingleStatement() throws ParseException: | stmt = FunctionDeclaration() | stmt = CreateStatement() | stmt = LoadStatement() + | stmt = CopyStatement() | stmt = DropStatement() | stmt = SetStatement() | stmt = InsertStatement() @@ -2714,6 +2716,36 @@ Statement SetStatement() throws ParseException: } } +CopyStatement CopyStatement() throws ParseException: +{ + Token startToken = null; + DataverseName dataverseName = null; + Identifier datasetName = null; + boolean alreadySorted = false; + String adapterName; + Map<String,String> properties; + Pair<DataverseName,Identifier> nameComponents = null; +} +{ + <COPY> (<INTO>)? { startToken = token; } nameComponents = QualifiedName() + { + dataverseName = nameComponents.first; + datasetName = nameComponents.second; + } + <USING> adapterName = AdapterName() properties = Configuration() + { + ExternalDetailsDecl edd = new ExternalDetailsDecl(); + edd.setAdapter(adapterName); + edd.setProperties(properties); + try { + CopyStatement stmt = new CopyStatement(dataverseName, datasetName.getValue(), edd, null); + return addSourceLocation(stmt, startToken); + } catch (CompilationException e){ + throw new SqlppParseException(getSourceLocation(startToken), e.getMessage()); + } + } +} + LoadStatement LoadStatement() throws ParseException: { Token startToken = null; @@ -2743,7 +2775,6 @@ LoadStatement LoadStatement() throws ParseException: } } - String AdapterName() throws ParseException : { String adapterName = null; @@ -5709,6 +5740,7 @@ TOKEN [IGNORE_CASE]: | <WHERE : "where"> | <WITH : "with"> | <WRITE : "write"> + | <COPY : "copy"> } <DEFAULT,IN_DBL_BRACE>